
    ti                       U d dl Z d dlZd dlZd dlZd dlZd dlZd dlZd dlZd dlZd dl	Z	d dl
Z
d dlZd dlZd dlZd dlZd dlZd dlmZ d dlmZ d dlmZ d dlmZ d dlmZ d dlmZmZmZ d dlmZ d d	lm Z m!Z!m"Z"m#Z# d d
l$m%Z% d dl&Z&d dl'Z&d dl(Z&d dl)m*Z+ d dl,m-Z- d dl.m/Z/ d dl0m1Z1 d dl2m3Z3 d dl4m5Z5 d dl6m7Z7m8Z8m9Z9m:Z:m;Z;m<Z<m=Z=m>Z>m?Z?m@Z@mAZAmBZBmCZC d dlDmEZEmFZFmGZG  ej                  eI      ZJeJj                  ej                         g dZMddgZNe>xs e?xs eBZO G d de!      ZPi d ePdd      d ePdd      d ePdd      d ePd d!      d" ePd#d$      d% ePd&d'      d( ePd)d*      d+ ePd,d-      d. ePd/d0      d1 ePd2d3      d4 ePd5d6      d7 ePd8d9      d: ePd;d<      d= ePd>d?      d@ ePdAdB      dC ePdDdE      dF ePdGdH      dI ePdJdK      iZQe G dL dM             ZRdN ZSdO ZTdP ZUdQ ZVdR ZWdS ZXdT ZYdUeZfdVZ[dW Z\dXe]fdYZ^dZe d[e]dUe]fd\Z_d] Z`d^ Zad_ Zbd` Zcda Zddb Zedc Zfdd Zgde Zhdf ZiddgZjdh Zkdi Zl e:dj       ZmeZendk<   dl Zodmepeqdnf   fdoZrddpZsdq Ztdre&j                  dse]dte]dUeZfduZve;dvdwdx edyz      dxd{dxfd|       ZweArd}Zxn e] ej                  d~d            ZxddiZze@rdezd<   ddeZfdZ{dUe]fdZ|ed        Z}dde]de]de]fdZ~de]deqfdZdae"e
j                     end<   dde"eq   dUdfdZddZdZ G d deC      Z G d de      Zdeeqee    f   dede fdZej                  dUeZfd       Zd ZdexefdZ G d deC      Z G d de-j                        Z G d de-j                        Ze	 dd       Z G d de&j&                  j(                  j                        Z G d de      Z G d deC      Zy)    N)Callable)contextmanager)	dataclass)	timedelta)Enum)partialreducewraps)StringIO)Any
NamedTupleOptionalUnion)patch)
DeviceType)_SymmetricMemory)	trace_log)common_utils)FILE_SCHEMAfind_free_portIS_SANDCASTLELazyValretry_on_connect_failuresskip_but_pass_in_sandcastleskip_but_pass_in_sandcastle_if	TEST_CUDATEST_HPUTEST_WITH_ROCMTEST_WITH_TSANTEST_XPUTestCase)_install_threaded_pg_uninstall_threaded_pgProcessLocalGroupncclxcclhcclcudaxpuc                   "    e Zd ZU eed<   eed<   y)TestSkip	exit_codemessageN)__name__
__module____qualname__int__annotations__str     p/home/ubuntu/crypto_trading_bot/.venv/lib/python3.12/site-packages/torch/testing/_internal/common_distributed.pyr,   r,   D   s    NLr6   r,   backend_unavailableH   z5Skipped because distributed backend is not available.small_worldsizeI   z Skipped due to small world size.odd_worldsizeW   zSkipped due to odd world size.no_cudaJ   zCUDA is not available.zmulti-gpu-1K   zNeed at least 1 CUDA devicezmulti-gpu-2M   zNeed at least 2 CUDA deviceszmulti-gpu-3P   zNeed at least 3 CUDA deviceszmulti-gpu-4Q   zNeed at least 4 CUDA deviceszmulti-gpu-5R   zNeed at least 5 CUDA deviceszmulti-gpu-6S   zNeed at least 6 CUDA deviceszmulti-gpu-7T   zNeed at least 7 CUDA deviceszmulti-gpu-8U   zNeed at least 8 CUDA devicesr&   L   z#c10d not compiled with NCCL support
skipIfRocmN   zTest skipped for ROCmno_peer_accessO   z'Test skipped because no GPU peer accessgenericV   zHTest skipped at subprocess level, look at subprocess log for skip reasonimporterrorX   z"Test skipped due to missing importno_acceleratorY   zaccelerator is not available.c                       e Zd Zi Zh ded<    e       ed<   h ded<   h ded<   i Zh ded<   h ded	<   h ded
<   h ded<    e       ed<   erdhed<   erdhed<   yy)DistTestCases>   mpiuccr&   r'   allgather_coalescedr	   >   rV   r&   r'   zsendrecv anysourcezcpu barrier>   rV   gloor&   gpur)   ddpsubgrouppluginr(   hpur'   r*   N)r/   r0   r1   skip_collectivesetbackend_featurer   r    r5   r6   r7   rT   rT   c   s     O-KO)* #OH,CO()%<OM" O4OE5OF4OE"9OJ #OH"("( r6   rT   c                     | t         v S N)DDP_RANK_DEVICES)devices    r7   requires_ddp_rankre   y   s    %%%r6   c                 .     t                fd       }|S )zSkips if the world size exceeds the number of GPUs, ensuring that if the
    test is run, each rank has its own GPU via ``torch.cuda.device(rank)``.c                     t         s2t        s,t        s&t        j                  t
        d   j                         t        t        j                  d         }t         rJt        j                  j                         |k  r)t        j                  t
        d|    j                         t        rJt        j                  j                         |k  r)t        j                  t
        d|    j                         t        rJt        j                  j                         |k  r)t        j                  t
        d|    j                          | i |S )Nr>   
WORLD_SIZE
multi-gpu-)r   r   r    sysexit
TEST_SKIPSr-   r2   osenvirontorchr)   device_countr]   r*   )argskwargs
world_sizefuncs      r7   wrapperzskip_if_no_gpu.<locals>.wrapper   s    XHHZ	*445L12
002Z?HHZ*ZL 9:DDE		..0:=HHZ*ZL 9:DDE		..0:=HHZ*ZL 9:DDET$V$$r6   r
   rt   ru   s   ` r7   skip_if_no_gpurx   }   s"     4[% % Nr6   c                 .     t                fd       }|S )Nc                      t         j                  d   dk7  rEt        t         j                  d         dk  r&t        j                  t
        d   j                          | i |S )NBACKENDrU   rh      r:   rm   rn   r2   rj   rk   rl   r-   rq   rr   rt   s     r7   ru   z(skip_if_small_worldsize.<locals>.wrapper   sR    JJy!U*BJJ|4L0MPQ0QHHZ 12<<=T$V$$r6   rv   rw   s   ` r7   skip_if_small_worldsizer           
4[% % Nr6   c                 .     t                fd       }|S )Nc                      t         j                  d   dk7  rHt        t         j                  d         dz  dk(  r&t        j                  t
        d   j                          | i |S )Nr{   rU   rh         r<   r}   r~   s     r7   ru   z&skip_if_odd_worldsize.<locals>.wrapper   sW    JJy!U*BJJ|4L0MPQ0QUV0VHHZ0::;T$V$$r6   rv   rw   s   ` r7   skip_if_odd_worldsizer      r   r6   c                       fd}|S )Nc                 4     t                fd       }|S )Nc                      dk(  rKt         j                  j                         k  r*t        j                  t
        d    j                         y  | i |S Nr&   ri   )ro   r)   rp   rj   rk   rl   r-   )rq   rr   backendrt   ns     r7   ru   zCrequire_n_gpus_for_nccl_backend.<locals>.decorator.<locals>.wrapper   sM    & UZZ%<%<%>%Bj$45??@T,V,,r6   rv   )rt   ru   r   r   s   ` r7   	decoratorz2require_n_gpus_for_nccl_backend.<locals>.decorator   s     	t	- 
	- r6   r5   )r   r   r   s   `` r7   require_n_gpus_for_nccl_backendr      s     r6   c                      d } | S )Nc                 .     t                fd       }|S )Nc                      	 ddl m}m}  | i |S # t        $ r) t	        j
                  t        d   j                         Y y w xY w)Nr   )AutoModelForMaskedLM
BertConfigrO   )transformersr   r   ImportErrorrj   rk   rl   r-   )rq   rr   r   r   rt   s       r7   ru   z?import_transformers_or_skip.<locals>.decorator.<locals>.wrapper   sA    >IT,V,, >M2<<=>s    /AArv   rw   s   ` r7   r   z.import_transformers_or_skip.<locals>.decorator   s     	t	> 
	> r6   r5   )r   s    r7   import_transformers_or_skipr      s    
 r6   c                     t         r"t        j                  j                         | k\  ryt        r"t        j
                  j                         | k\  ryt        r"t        j                  j                         | k\  ryyNTF)r   ro   r)   rp   r   r]   r    r*   )xs    r7   at_least_x_gpur      sS    UZZ,,.!3EII**,1EII**,1r6   returnc                 V    t        | d   dd       }t        |       dk(  s|y ||       y)Nr   _handle_test_skipFT)getattrlen)rq   msgr   s      r7   _maybe_handle_skip_if_lt_x_gpur      s5    Q)<dC
4yA~*2cr6   c                       fd}|S )Nc                 2     t                fd       }|S )Nc                     t         j                  j                         r)t         j                  j                         k\  r | i |S t        r)t         j
                  j                         k\  r | i |S t        r)t         j                  j                         k\  r | i |S t        d    }t        | |j                        s t        j                  |j                         y y )Nri   )ro   r)   is_availablerp   r   r]   r    r*   rl   r   r.   rj   rk   r-   )rq   rr   	test_skiprt   r   s      r7   ru   z4skip_if_lt_x_gpu.<locals>.decorator.<locals>.wrapper   s    zz&&(UZZ-D-D-F!-KT,V,,EII2249T,V,,EII2249T,V,,"Zs#34I1$	8I8IJ,,- Kr6   rv   )rt   ru   r   s   ` r7   r   z#skip_if_lt_x_gpu.<locals>.decorator   s     	t		. 
		. r6   r5   )r   r   s   ` r7   skip_if_lt_x_gpur      s     r6   r   c                       fd}|S )aR  
    Decorator to request a specific world size for a test. The test harness can
    read this attribute to set the number of ranks to spawn. If there are fewer
    than `n` CUDA devices available, the test should be skipped by the harness.

    Usage:
        @require_world_size(3)
        def test_something(self):
            ...
    c                     | _         t        j                  j                         } t	        j
                  |k\  d d|       |       S )Nz	requires z GPUs, found )_required_world_sizero   r)   rp   unittest
skipUnless)rt   	availabler   s     r7   r   z&requires_world_size.<locals>.decorator   sR    $%!JJ++-	
x""Nis-	{C

 	r6   r5   )r   r   s   ` r7   requires_world_sizer      s     r6   objdefaultc                     	 t        | d      r%t        | j                        r| j                         n| j                  }t	        | |      }|j
                  }t        |      S # t        $ r |cY S w xY w)z
    Returns the requested world size for the currently running unittest method on `obj`
    if annotated via `@require_world_size(n)`, else returns `default`.
    _current_test_name)hasattrcallabler   _testMethodNamer   r   r2   	Exception)r   r   	test_namefnvalues        r7   get_required_world_sizer     st    
 s01hs?U?U6V ""$$$ 	
 S)$''5z s   AA" "A0/A0c                       fd}|S )Nc                 4     t                fd       }|S )Nc                  2   dk7  r | i |S t         j                  j                         r)t         j                  j                         k\  r | i |S t        d    }t        | |j                        s t        j                  |j                         y y r   )
ro   r)   r   rp   rl   r   r.   rj   rk   r-   )rq   rr   r   r   rt   r   s      r7   ru   z9nccl_skip_if_lt_x_gpu.<locals>.decorator.<locals>.wrapper  s    & T,V,,zz&&(UZZ-D-D-F!-KT,V,,"Zs#34I1$	8I8IJ,,- Kr6   rv   )rt   ru   r   r   s   ` r7   r   z(nccl_skip_if_lt_x_gpu.<locals>.decorator  s     	t	. 
	. r6   r5   )r   r   r   s   `` r7   nccl_skip_if_lt_x_gpur     s     r6   c                     | j                         }d|v sJ d|v sJ d|v sJ |d   }|j                  d      dk(  r|n|j                  d      d   }||v sJ d| d|        y )	N	iteration	has_errorerrorz
Exception raised from r   zDid not find expected z in ddp logging data error: )_get_ddp_logging_datafindsplit)	model_DDP
err_substrddp_logging_datalogging_erractuals        r7   verify_ddp_error_loggedr   ,  s     668********&&&&"7+K ??56"< 	89!< 
 [  
 (D[MR r6   c                 .     t                fd       }|S )aJ  
    Convenience decorator to set/unset TORCH_NCCL_BLOCKING_WAIT flag. Note that use of
    this decorator will override the setting of TORCH_NCCL_ASYNC_ERROR_HANDLING for
    the particular test. After the test, both TORCH_NCCL_BLOCKING_WAIT and
    TORCH_NCCL_ASYNC_ERROR_HANDLING will be restored to their original values.
    c                     	 t         j                  d   }t         j                  d= 	 t         j                  d   }dt         j                  d<   	  | i |}|||t         j                  d<   ||t         j                  d<   S S # t        $ r d }Y jw xY w# t        $ r d }Y gw xY w# dt         j                  d<   w xY w# ||t         j                  d<   ||t         j                  d<   w w xY w)NTORCH_NCCL_ASYNC_ERROR_HANDLINGTORCH_NCCL_BLOCKING_WAIT1)rm   rn   KeyError)rq   rr    cached_nccl_async_error_handlingcached_nccl_blocking_waitretrt   s        r7   ru   z(with_nccl_blocking_wait.<locals>.wrapperF  s   	4AC1B, 

<=	9:<***;% 69BJJ12	S''C 0;4 

<= )49R

56 51  	4/3,	4  	-(,%	- 69BJJ12 0;4 

<= )49R

56 5s@   $B B 	B> BBB# B& "B##B& &B;>-C+rv   rw   s   ` r7   with_nccl_blocking_waitr   >  s%     4[ S  SD Nr6   c                       fd}|S )zK
    Runs a test for each distributed debug level specified in levels.
    c                 2     t                fd       }|S )Nc                     t         j                  j                  dd       }D ][  }|t         j                  d<   t        j                           | i |}t        j
                          |I|t         j                  d<   ] S )NTORCH_DISTRIBUTED_DEBUG)rm   rn   getc10dset_debug_level_from_envbarrier)rq   rr   	old_levellevelr   rt   levelss        r7   ru   z:with_dist_debug_levels.<locals>.decorator.<locals>.wrapperr  sx    

'@$GI F8=

45--/D+F+(<EBJJ89F Jr6   rv   )rt   ru   r   s   ` r7   r   z)with_dist_debug_levels.<locals>.decoratorq  s     	t	 
	 r6   r5   )r   r   s   ` r7   with_dist_debug_levelsr   l  s    
$ r6   c                  @    t        t        j                          d      S )Nz+c10d was not compiled with the Gloo backend)r   r   is_gloo_availabler5   r6   r7   requires_gloor     !    )""$$5 r6   c           	         t         sd S t        j                         st        d      S t	        t
        j                  j                  j                         | k  d|  dt
        j                  j                  j                          d|       S )Nc                     | S rb   r5   )fs    r7   <lambda>z'requires_nccl_version.<locals>.<lambda>  s     r6   +c10d was not compiled with the NCCL backendz0Requires NCCL version greater than or equal to: z	, found: z
, reason: )	r   r   is_nccl_availabler   r   ro   r)   r&   version)r   r   s     r7   requires_nccl_versionr     s    !!#*9
 	
 .JJOO##%/>wiyQVQ[Q[Q`Q`QhQhQjPkkuvyuz{
 	
r6   c                      t        dd      S )zK
    Require NCCL shrink support (NCCL available and version >= 2.27).
    )r      z Need NCCL 2.27+ for shrink_group)r   r5   r6   r7   requires_nccl_shrinkr     s     !*LMMr6   c                  @    t        t        j                          d      S )Nr   )r   r   r   r5   r6   r7   requires_ncclr     r   r6   c                  @    t        t        j                          d      S )Nz*c10d was not compiled with the UCC backend)r   r   is_ucc_availabler5   r6   r7   requires_uccr     !    )!!##4 r6   c                  @    t        t        j                          d      S )Nz*c10d was not compiled with the MPI backend)r   r   is_mpi_availabler5   r6   r7   requires_mpir     r   r6   c                 V    | t         } t        d | D              }t        | d|        S )a  
    Decorator to skip tests if no accelerator communication backend (NCCL, XCCL, HCCL) is available.

    Args:
        backends (Optional[List[str]]): Specific accelerator backends to check (e.g., ["nccl", "xccl", "hccl"]).
                                       If None, checks all supported accelerator backends (NCCL, XCCL, HCCL).

    Returns:
        callable: A decorator that skips the test if no specified accelerator backend is available.
    c              3      K   | ]=  }	 t        j                  t         j                  d  dj                  |d               ? yw)c                      t         S rb   )r   r5   r6   r7   r   z=requires_accelerator_dist_backend.<locals>.<genexpr>.<lambda>  s    H r6   r%   c                       yNFr5   r5   r6   r7   r   z=requires_accelerator_dist_backend.<locals>.<genexpr>.<lambda>  s    r6   N)r   r   is_xccl_availabler   ).0r   s     r7   	<genexpr>z4requires_accelerator_dist_backend.<locals>.<genexpr>  sG       	&****$	
 #g}
%		(s   AAz5No accelerator communication backend available among )ACCELERATOR_DIST_BACKENDSanyr   )backendsbackend_availables     r7   !requires_accelerator_dist_backendr    sH     ,     *
?zJ r6   c                      t         j                  j                         xr$ t        j                  t
        j                  d      } t        |  d      S )Nr   z"multicast support is not available)ro   r)   r   r   has_multicast_supportr   CUDAr   )r  s    r7   requires_multicast_supportr
    sI    

! 	G22:??AF  *!!, r6   c                      t         r@t        r9ddg} | D ]/  }|t        j                  j	                  d      j
                  v s/ y yyy)Ngfx942gfx950r   TF)r   r   ro   r)   get_device_propertiesgcnArchName)	arch_listarchs     r7   #evaluate_platform_supports_symm_memr    sL    !8,I!  5::;;A>JJJ  r6   c                      t               S rb   )r  r5   r6   r7   r   r     s
    /1 r6   PLATFORM_SUPPORTS_SYMM_MEMc                 d     t        j                  t        t        d   j                        |       S )z&Skips a test for ROCm multiprocess UTsrI   )r   skipIfr   rl   r.   )rt   s    r7   skip_if_rocm_multiprocessr    s%    L8??>:l+C+K+KLTRRr6   r  .c                       fd}|S )z4Skips a test for given ROCm archs - multiprocess UTsc                     d }t         rDt        j                  j                  d      j                  j                  d      d   }|v rd } t        j                  |d u|      |       S )Nr   :z0skip_if_rocm_arch_multiprocess: test skipped on )r   ro   r)   r  r  r   r   r  )rt   reasonpropr  s      r7   r   z1skip_if_rocm_arch_multiprocess.<locals>.decorator  se    ::33A6BBHHMaPDt|KD6R:xvT16:4@@r6   r5   )r  r   s   ` r7   skip_if_rocm_arch_multiprocessr    s    A r6   c                       fd}|S )z:Skips a test for ROCm based on ROCm ver - multiprocess UTsc                 :   d }t         rut        t        j                  j                        }|j                  dd      d   }t        d |j                  d      D              }||t              k  r	d| d d	} t        j                  |d u|      |       S )
N-r   maxsplitr   c              3   2   K   | ]  }t        |        y wrb   )r2   )r   r   s     r7   r  zLskip_if_rocm_ver_lessthan_multiprocess.<locals>.decorator.<locals>.<genexpr>  s     &O!s1v&Os   .z-skip_if_rocm_ver_lessthan_multiprocess: ROCm z is available but z	 required)	r   r4   ro   r   hipr   tupler   r  )rt   r  rocm_versionrocm_version_tupler   s       r7   r   z9skip_if_rocm_ver_lessthan_multiprocess.<locals>.decorator	  s    u}}001L'--cA->qAL!&&O|7I7I#7N&O!O"*?%g6HI[H\\novnw  xA  B:xvT16:4@@r6   r5   )r   r   s   ` r7   &skip_if_rocm_ver_lessthan_multiprocessr)    s    A r6   c                  <    t        t        j                  dk(  d      S )Nwin32z8This unit test case is not supported on Windows platform)r   rj   platformr5   r6   r7   skip_if_win32r-    s    )B r6   rd   majorminorc                     | j                   dk7  ryt        j                  j                  yt        j                  j                  |       ||fk\  S )z
    Returns True if the device's compute capability is (major, minor) or higher.
    Error out if the device is not a CUDA device.
    Returns False if device is a RoCM device.
    Returns True if device is a non-CUDA device.
    r)   TF)typero   r   r%  r)   get_device_capability)rd   r.  r/  s      r7   sm_is_or_higher_thanr3  "  sD     {{f}}$::++F3u~EEr6   	localhostr   T   )minutesFc                     t               }|rEt        |t        d      z        }t        j                  j
                  j                  | ||||      S t        j                  | |||||      S )zL
    Creates a TCP store. Retries if the chosen port is already in use.
    r   )milliseconds)wait_for_workers	use_libuv)r   r2   r   ro   classes	dist_c10dTCPStorer   )	addrrs   	is_mastertimeoutr9  	jit_classr:  porttimeout_milliseconds	            r7   create_tcp_storerD  3  sr     D!'I1,E"EF}}&&//$
I/B
 	
 }}-
 	
r6   i  !DISTRIBUTED_TESTS_DEFAULT_TIMEOUT300test_ddp_uneven_inputsi     test_join_kwargs	lazy_initc                     t         j                  dk(  s| !t        j                  j	                  d|      S t        j                  j	                  | |      S )Nr+  z	127.0.0.1)hostnamerJ  	interfacerJ  )rj   r,  r   ProcessGroupGloocreate_devicerM  s     r7   rP  rP  ^  s[    
||w)"3$$22 I 3 
 	
 $$229 3 
 	
r6   c                 Z    t         j                  | j                  d      d   t              S Nr$  r   )TIMEOUT_OVERRIDEr   r   TIMEOUT_DEFAULT)test_ids    r7   get_timeoutrV  i  s#    c 22 6HHr6   c               #   N  K   t               t               }} t        j                  t        j                  }}	 | |ct        _        t        _        t        j                  t        j                  f ||ct        _        t        _        y # ||ct        _        t        _        w xY wwrb   )r   rj   stdoutstderr)new_outnew_errold_outold_errs       r7   captured_outputr^  m  sl     z8:WGzz3::WG2!('
CJjj#**$$!('
CJ'
CJs   5B%9B	 1B%	B""B%rankrs   
num_inputsc                    ddt         dt         dt         dt         fd}dt         fd}t        |d      t        |d	      t        |d
      t        |d      t        |d	      t        |d
      fD cg c]N  }t        |      D cg c]  } ||| z  |z   ||z         c}t        |      D cg c]  } ||||z         c}fP c}}S c c}w c c}w c c}}w )z
    Generate a number of basic test cases for sparse reduction.
    These cover tensors with a varying number of sparse dimensions and a varying
    number of dense dimensions. The only reduction operation we support is sum.
    r   r_  rs   sparse_dims
dense_dimsc           	         t        j                  t        j                  | dz         d| dz   f      }|gt        |      D cg c]  }d c}z   }t        |dz
        D ]A  }t        j                  |t        j
                  d| dz         f      }|j                  |       C t        j                  | dz   gt        |      D cg c]  }d c}z         }t        j                  |||      S c c}w c c}w )Nr   r   )	ro   reshapearangerangecatzerosappendonessparse_coo_tensor)r_  rs   rb  rc  indices_shapevaluess           r7   generatez,simple_sparse_reduce_tests.<locals>.generate  s     --TAX 6D1HF5+<=a=={Q' 	%Aii%++a*B CDGLL$	% TAXJU:5F)G!)GGH&&w>>  > *Hs   	C+	C0
c           
      |    t        t        j                  t        |      D cg c]  } | ||       c}      S c c}w rb   )r	   operatoraddrg  )r   rs   r_  s      r7   compute_sumz/simple_sparse_reduce_tests.<locals>.compute_sum  s2    LLE*<MND2dJ/N
 	
Ns   9
)rb  r      )rc  )r   r   )r2   r   rg  )r_  rs   r`  rq  ru  r   is          r7   simple_sparse_reduce_testsrx  x  s    
?s 
? 
?# 
?s 
?
C 
 H!,H!,H!,H+H+H+
 	 z* :$q(*z*AB @EZ?PQ![Z*45Q	
  Rs$   5CC C/CC
Cr   c           
      f   t         j                  j                         }t        rt         j                  j                         }t
        rt         j                  j                         }t        |      }d}| |kD  r|| z  }t        |       D ci c]  }|t        |||z  |dz   |z          }}|S c c}w )zMultigpu tests are designed to simulate the multi nodes with multi
    GPUs on each node. Nccl backend requires equal #GPUs in each process.
    On a single node, all visible GPUs are evenly
    divided to subsets, each process only uses a subset.
    r   )	ro   r)   rp   r   r]   r    r*   rg  list)rs   r   nGPUsvisible_devicesnGPUs_per_processrw  rank_to_GPUs          r7   init_multigpu_helperr    s     JJ##%E		&&(		&&(ElO E!Z/ z" 	
4$5 5QBS8STUUK  	s   B.tmp_dirinit_methodc                    t        j                         at        j                  t        j
                  d<   t	        j                  t        j                  j                  t        j                  d             t	        j                  t        j                  j                  t        j                  d             t        j                  j                  t        j                  d      }t	        j                  |       | | t        j
                  d<   y t        t        j                  j                  |d      z   t        j
                  d<   y )NTEMP_DIRr   test_dirinit_dirINIT_METHODshared_init_file)
tempfileTemporaryDirectoryr  namerm   rn   mkdirpathjoinr   )r  init_dir_paths     r7   initialize_temp_directoriesr    s    ))+G$\\BJJzHHRWW\\',,	23HHRWW\\',,
34GGLLz:MHH]$/

=!$/"'',,-3
 %


=!r6   c                  :    t         t         j                          y y rb   )r  cleanupr5   r6   r7   cleanup_temp_dirr    s     r6      c            	       6    e Zd ZdZdZdefdZedefd       Zede	fd       Z
d Z	 dded	edd
f fdZd fdZd fdZdefdZddZddZ G d de      Zede	fd       Zede	dededd
fd       Zdedd
fdZddZddZddZedefd       Z xZS )MultiProcessTestCaser   
   r   c                      yr   r5   selfs    r7   _should_stop_test_suitez,MultiProcessTestCase._should_stop_test_suite  s    r6   c                      y)NTr5   r  s    r7   destroy_pg_upon_exitz)MultiProcessTestCase.destroy_pg_upon_exit  s    r6   c                     t         S rb   DEFAULT_WORLD_SIZEr  s    r7   rs   zMultiProcessTestCase.world_size      !!r6   c                 V    t              fd       }t        j                  ||       S )Nc                 j    | j                   | j                  k(  r| j                         y          y rb   )r_  MAIN_PROCESS_RANK_join_processesr  r   s    r7   ru   z1MultiProcessTestCase.join_or_run.<locals>.wrapper  s(    yyD222$$R(r6   r
   types
MethodTyper  r   ru   s    ` r7   join_or_runz MultiProcessTestCase.join_or_run  .    	r	 
	 ..r6   method_name
methodNameNc                     |dk7  r|}t         |   |       	 t        | |      }t        | || j	                  |             y # t
        $ r+}|dk7  rt        d| j                   d|       |Y d }~y d }~ww xY wNrunTestzno such test method in : super__init__r   setattrr  AttributeError
ValueError	__class__r  r  r  r   er  s        r7   r  zMultiProcessTestCase.__init__      
 "$K%		{+BD+t'7'7';< 	Y& !-dnn-=R
|L '	   (A 	A6!A11A6c                    t         |           i | _        g | _        g | _        | j
                  | _        t        j                  d      5 }|j                  | _
        d d d        i | _        y # 1 sw Y   i | _        y xY w)NFdelete)r  setUpspecial_return_code_checksskip_return_code_checks	processesr  r_  r  NamedTemporaryFiler  	file_namepid_to_pipe)r  r   r  s     r7   r  zMultiProcessTestCase.setUp  ss     13' .0$**	((6 	$!VVDN	$ 	$ s   A..A>c                 r    t         |           | j                  D ]  }|j                           g | _        y rb   )r  tearDownr  	terminate)r  pr  s     r7   r  zMultiProcessTestCase.tearDown1  s3     	AKKM	 r6   c                 F    | j                         j                  d      d   S rR  idr   r  s    r7   r   z'MultiProcessTestCase._current_test_name;  s    wwys#B''r6   c                    g | _         t        t        | j                              D ]  }t        j
                  j                         \  }} || j                  j                  dt        |      z   || j                         | j                  |fdt        | dd      i      }|j                          t        j                  d||j                          || j"                  |j                   <   | j                   j%                  |        y )Nprocess fake_pgF)targetr  rq   rr   Started process %s with pid %s)r  rg  r2   rs   ro   multiprocessingPiper  _runr4   r   r  r   startloggerinfopidr  rj  )r  procr_  parent_conn
child_connprocesss         r7   _start_processesz%MultiProcessTestCase._start_processes?  s    #doo./ 	+D&+&;&;&@&@&B#K~~**#d)+++-NN	 wtY>G MMOKK8$L,7DW[[)NN!!'*%	+r6   c                     	 t         j                  j                  d       t         j                  j	                  d      j
                  }| j                  |       y # t        $ r Y Fw xY w)Nspawn)ro   r  set_start_methodRuntimeErrorget_contextProcessr  )r  r  s     r7   _spawn_processesz%MultiProcessTestCase._spawn_processesU  s[    	!!227; $$009AAd#	  		s   A 	A('A(c                       e Zd ZdZy)MultiProcessTestCase.Eventr   N)r/   r0   r1   GET_TRACEBACKr5   r6   r7   Eventr  ^  s    r6   r  r_  c                    t         j                  d|       	 t        j                  j	                  | |g      }| |v r| j
                  rt         j                  d|       y | j                         }t         j                  d||       |t        j                  j                  k(  rt        j                  d      5 }t        j                  |       |j                          |j!                  d       | j#                  |j%                                t         j                  d|       d d d        ||v ry # 1 sw Y   xY w)Nz*Starting event listener thread for rank %sz:Pipe closed for process %s, stopping event listener threadzReceived event %s on process %szr+)moder   zProcess %s sent traceback)r  debugr  
connectionwaitclosedrecvr  r  r  r  r  r  faulthandlerdump_tracebackflushseeksendread)parent_pipesignal_piper_  ready_pipeseventtmp_files         r7   _event_listenerz$MultiProcessTestCase._event_listenera  s   A4H)4499;:TUKk)%%LLT #((*=udK066DDD!44$? G8$33H= ( a(#((9$?FG k)5  G Gs   :A,D55D>r   r  c                 T     | |      }||_         ||_        |j                  ||       y rb   )r_  r  run_testclsr_  r   r  r  rr   r  s          r7   r  zMultiProcessTestCase._run  s)     9~	"i-r6   c           	      (   t         j                  j                  d      \  }}t        j                  t
        j                  ||| j                  fd      }|j                          t        j                  dk7  r2t        j                  dk7  rt         j                  j                  d       dt        j                  d<   t        j                           	  t#        | |              ||j?                  d        |J |jA                          |jC                          | jD                  r	 tG        jH                          y y # t$        j&                  $ r[}t(        j+                  d	| j                  |t-        |             t        j.                  t0        d
   j2                         Y d }~d }~wt4        $ r t(        j7                  dt9        j:                         | j                  t
        j<                         |j?                  t9        j:                                t        j.                  t
        j<                         Y Zw xY w# ||j?                  d        |J |jA                          |jC                          w xY w# tJ        tL        f$ r Y y w xY w)NF)duplexT)r  rq   daemonr+  darwinr   TORCH_SHOW_CPP_STACKTRACESz4Process %s skipping test %s for following reason: %srM   z;Caught exception: 
%s exiting process %s with exit code: %s)'ro   r  r  	threadingThreadr  r  r_  r  rj   r,  _C'_set_print_stack_traces_on_fatal_signalrm   rn   r   set_rng_seedr   r   SkipTestr  r  r4   rk   rl   r-   r   r   	traceback
format_excTEST_ERROR_EXIT_CODEr  r  closer  r   destroy_process_groupAssertionErrorr  )r  r   r  signal_recv_pipesignal_send_pipeevent_listener_threadses          r7   r  zMultiProcessTestCase.run_test  s   -2-B-B-G-Gu-G-U** ) 0 0'77/;!

 	##%<<7"s||x'? HH<<TB36

/0!!#	 $GD)$&(  + %%d+(444!&&($$ **,	 %7    	6KKF		B	 HHZ	*4455 		@LLN$$&		$99	 Y1134HH)>>?		@  + %%d+(444!&&( #J/ sJ   D? )I? ?I AF(#I (BI <I ?I  I 9I<?JJc                    g }t        | j                        D ]h  \  }}|j                  | j                  |j                     }	 |j                  t        j                  j                         |j                  ||f       j |D ]x  \  }}	 |j                  d      rK|j                  rt        j                  d|       ;|j!                         }t        j#                  d||       nt        j#                  d|       z y # t        $ r t        j                  d|       Y w xY w# t        $ r t        j                  d|       Y w xY w)Nz>Encountered error while trying to get traceback for process %sr5  z5Pipe closed for process %s, cannot retrieve tracebackz)Process %s timed out with traceback: 

%sz6Could not retrieve traceback for timed out process: %s)	enumerater  exitcoder  r  r  r  r  r  rj  ConnectionErrorr  	exceptionpollr  r  r  r   )r  pipesrw  r  piper_  r  s          r7   _get_timedout_process_tracebackz4MultiProcessTestCase._get_timedout_process_traceback  s0   #DNN3 
	JAw'''4II288FFGLL!T+
	   	JD$99Q<{{S  ! $		ILLEtY LLPRV!	 ' $$X4 #   Ts*   <D3D' >D'D$#D$'E	E	c                    t        | j                               }t        j                         }d}	 	 t        | j                        D ]w  \  }}|j
                  t        j                  k(  s$t        d| d|j
                   d       t        j                  j                         }|D ]  }|j                           d} n |rnt        d | j                  D              rntt        j                         |z
  }	|	|kD  rA| j                          t        d| d       | j                  D ]  }|j                           nt        j                  d	       #t        j                         |z
  }
| j!                  ||
       | j"                  j%                         D ]  }|j'                           y # | j"                  j%                         D ]  }|j'                           w xY w)
NFTProcess z terminated with exit code z", terminating remaining processes.c              3   8   K   | ]  }|j                   d u  y wrb   )r  )r   r  s     r7   r  z7MultiProcessTestCase._join_processes.<locals>.<genexpr>  s     F!qzz-Fs   zTiming out after z" seconds and killing subprocesses.g?)rV  r  timer  r  r  r  r  printro   r  active_childrenr  allr  sleep_check_return_codesr  rp  r  )r  r   r@  
start_timesubprocess_errorrw  r  r$  acelapsedelapsed_timer  s               r7   r  z$MultiProcessTestCase._join_processes  s   dggi(YY[
 &	%dnn5 DAq zz%9%N%NN&qc)DQZZLPrs +0*?*?*O*O*Q"1 +BLLN++/( $Ft~~FF))+
2W$88:+G94VW "^^ &&

3= @  99;3L$$R6 ((//1 

((//1 

s   9F. +DF. .1Gc           
         | j                   st        j                  d       y| j                   d   }t        | j                         D cg c]&  \  }}|j                  t
        j                  k(  r||f( }}}|r[d}|D ]I  \  }}| j                  |j                     j                         }	|d| dt
        j                   d|	 dz  }K t        |      t        | j                         D ]#  \  }}|j                  t        d| d	| d
       || j                  v ryt        j                         D ]q  }
|j                  |
j                  k(  st        r1t        j!                  d| j#                         |
j$                          yt'        j(                  |
j$                         d}|| j*                  v r| j*                  |   }| j-                  |j                  |d| d|j                   d|j                          yc c}}w )z
        Checks that the return codes of all spawned processes match, and skips
        tests if they returned a return code indicating a skipping condition.
        z<Note: no subprocesses were spawned, test was likely skipped.Nr    r   z exited with error code z and exception:

 terminated or timed out after  seconds6Skipping %s on sandcastle for the following reason: %szExpected exit code z	 but got z
 for pid: )r   )r  r  warningr  r  r  r  r  r  r  r  r  rl   rp  r-   r   r  r  r.   r   r  r  assertEqual)r  r   r,  first_processrw  r  errored_processesr   r  error_messageskipexpected_return_codes               r7   r'  z(MultiProcessTestCase._check_return_codes  s#    ~~NNN q) "$..1
1zz1FFF F
 

 E/ 
7 $ 0 0 = B B Dqc!9:N:c:c9d e''4oR9 u%% dnn- 	DAqzz!"qc!@hW 	 ---%%' 	:D%%7 
 KKP	
 "++DLL99	:"  ! 000#'#B#B2#F "" %&:%;9]E[E[D\\fgtgxgxfyz 	 	
g
s   
+Hc                      | j                   dk(  S )Nr   r_  r  s    r7   r?  zMultiProcessTestCase.is_mastera  s    yyA~r6   r  r  r   N)r/   r0   r1   r  r  boolr  propertyr  r2   rs   r  r4   r  r  r  r   r  r  r   r  staticmethodr  classmethodr  r  r  r  r'  r?  __classcell__r  s   @r7   r  r    s8   
   d   "C " "/ ?H8;	&$(C (+,$    < ..#&.36.	. .6# 6t 6p&P*XJ
X 4  r6   r  c                   >     e Zd Z fdZd ZdefdZddZd Z xZ	S )DistributedTestBasec                     t         |           t        | j                        t        j
                  d<   | j                          y )Nrh   )r  r  r4   rs   rm   rn   r  r  r  s    r7   r  zDistributedTestBase.setUpl  s/    #&t#7

< r6   c                     	 t         j                  j                          	 t	        j
                  | j                         y # t        $ r Y ,w xY w# t        $ r Y y w xY wrb   )ro   distributedr  r  rm   remover  OSErrorr  s    r7   r  zDistributedTestBase.tearDownq  sU    	335	IIdnn%  		  		s"   A A 	AA	AAr   c                 "    d|v ryd|v ryd|v ryy)Nr)   r&   r]   r(   r*   r'   rX   r5   )r  rd   s     r7   r   zDistributedTestBase.backend{  s$    Vf_f_r6   c                    || j                   }t        j                  |      j                         }t        j                  j                  | j                  |      }t        j                  j                  | j                  |      || j                  |       d| j                  |      v sd| j                  |      v r)t        j                  j                  | j                         t        j                  j                  j                         S )Nr   rs   r_  storer&   r'   )rs   ro   get_device_modulerp   rI  	FileStorer  init_process_groupr   r_  acceleratorset_device_indexdistributed_c10d_get_default_group)r  rd   rs   num_visible_devicesrO  s        r7   	create_pgzDistributedTestBase.create_pg  s    J#55f=JJL!!++DNN<OP,,LL(!	 	- 	
 T\\&))Vt||F7K-K..tyy9  11DDFFr6   c                     t        j                  |      j                         }t        | j                        D ci c]	  }|||z  g c}S c c}w rb   )ro   rP  rp   rg  rs   )r  rd   rW  rw  s       r7   rank_to_devicez"DistributedTestBase.rank_to_device  sG    #55f=JJL6;DOO6LMA++,,MMMs   Arb   )
r/   r0   r1   r  r  r4   r   rX  rZ  rB  rC  s   @r7   rE  rE  k  s%     
 GNr6   rE  subtest_configtest_fntest_kwargsc                    t        |j                               }|D cg c]  }|d   	 }}|D cg c]  }|d   	 }}t        j                  | D ]  }	t	        t        ||	d            }
 | j                  di |
5  t        j                  j                           ||i ||
 t        j                  j                          ddd       t        j                           yc c}w c c}w # 1 sw Y   *xY w)a\  
    Runs a test function given by ``test_fn`` as a subtest according to the
    configurations specified by ``subtest_config``. This amortizes the
    costly setup overhead (including process spawn and initializing the
    process group) over the subtests.

    Args:
        subtest_config (Dict[str, List[Any]]): A mapping from subtest
            keyword argument name to a list of its possible values.
        test_fn (Callable): A callable that runs the actual test.
        test_args: Positional arguments to pass to ``test_fn``.
        test_kwargs: Keyword arguments to pass to ``test_fn``.
    r   r   T)strictNr5   )rz  items	itertoolsproductdictzipsubTestro   _dynamoresetr   r   )cls_instr[  r\  	test_argsr]  subtest_config_itemsitemsubtest_config_keyssubtest_config_valuesrp  subtest_kwargss              r7   run_subtestsro    s    * 9=^=Q=Q=S8T:N%O$d1g%O%OBV-W$d1g-W-W##%:; c"5vdKLX// 	"MM!Y@+@@MM!	" 	 &P-W	" 	"s   C"C'<AC,,C5	c                  n    	 t        j                  g dd      j                  dk(  S # t        $ r Y yw xY w)a   
    If shell command `fi_info -p efa -t FI_EP_RDM` returns exit code 0 then we assume that the machine has
    Libfabric EFA interfaces and EFA software components installed,
    see https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/efa-start.html.
    )fi_infoz-pefaz-t	FI_EP_RDMF)checkr   )
subprocessrun
returncodeFileNotFoundErrorr5   r6   r7   has_efary    sA    NN;5j	
  s   %( 	44c                  "    t               rddgS dS )a  
    If the machine has Libfabric EFA interfaces and EFA software components installed it may cause
    'RuntimeError: In operator() at tensorpipe/common/ibv.h:172 "": Operation not supported' if tensorpipe
    uses InfiniBand transport, so we exclude it from tensorpipe transports,
    see https://github.com/pytorch/pytorch/issues/73885 and https://github.com/pytorch/pytorch/issues/65022
    shmuvN)ry  r5   r6   r7   tp_transportsr}    s     $IE4=/4/r6   c                 d      t        t        |      S d t                fd       }|S )z+
    Wrapper to use with a test method
    )r@  rs   c                      t               t        j                         }fd fd}g }t               D ]=  }t	        j
                  |||f      }|j                          |j                  |       ? |S )Nc                  >     t         j                  j                  k(  S rb   r   rU  _worldworlds   r7   world_is_validzaspawn_threads_and_init_comms.<locals>._run_test_method_with_multi_threads.<locals>.world_is_valid      D118888r6   c                 ~   t        j                  d| |       	                  rt        j                          y y # t        $ rR}t        j                  j                  | t        j                         f       t        j                  |       Y d }~sd }~ww xY w#         rt        j                          w w xY w)Nthreadedr   r_  rs   rO  )r   rR  BaseExceptionMultiThreadedTestCaseexception_queueputrj   exc_infor$   exception_handler  )r_  world_pgrO  excallbackr  rs   s       r7   workerzYspawn_threads_and_init_comms.<locals>._run_test_method_with_multi_threads.<locals>.worker  s    ##"*E
1
 "#..0 $ ! %55994:PQ!22  "#..0 $s*   A   	B	ABB BB B<r  rq   )r"   r   	HashStorerg  r  r  r  rj  )	rs   r  global_storer  threadsr_  tr  r  s	   ``     @@r7   #_run_test_method_with_multi_threadszIspawn_threads_and_init_comms.<locals>._run_test_method_with_multi_threads  sq    $&~~'	9	1  *% 	D  dE<5PQAGGINN1	
 r6   c                 X    t         j                  j                  j                  d       	   fd      }t        j                  |       t         j                  j                  j                  d       y # t         j                  j                  j                  d       w xY w)NTc                       g i S rb   r5   )rq   rt   rr   r  s   r7   r   z?spawn_threads_and_init_comms.<locals>.wrapper.<locals>.<lambda>	  s    D$?$?$? r6   F)ro   r  _distributed_c10d_set_thread_isolation_moder  _join_threads)r  rq   rr   r  r  rt   rs   s   ``` r7   ru   z-spawn_threads_and_init_comms.<locals>.wrapper  sv     	""==dC	I9?G "//>HH&&AA%HEHH&&AA%Hs   %A> >+B))r   spawn_threads_and_init_commsr
   )rt   r@  rs   ru   r  s   ` ` @r7   r  r    sD     |('j
 	
> 4[
I 
I Nr6   c                       e Zd ZdZ ej
                         ZdZd Z	 dde	de	ddf fdZ
d	 Zd
 Zd fdZ fdZd Zed        Zd Zed        Zed        Zedefd       Zede	fd       ZddddZddddZ xZS )r  a5  
    Test runner that runs all tests with the in-proc process group using
    multiple threads with the threaded process group.

    Each test spawns world_size threads and run the test method in each thread.

    Difference from regular MultiProcess test runner:
    Must explicitly defines SetUp and call self._spawn_threads() to run the tests.
    Cannot use setUp / tearDown (must use perThreadSetup / perThreadShutdown)
        to set up / tear down each thread when running each test.
    No global state possible
        How bad of a limitation is this?
    r   c                 V    t              fd       }t        j                  ||       S )Nc                     | j                   | j                  k(  r| j                  | j                         y          y rb   )r_  MAIN_THREAD_RANKr  r  r  s    r7   ru   z2MultiThreadedTestCase.join_or_run.<locals>.wrapper'  s.    yyD111""4<<4r6   r  r  s    ` r7   r  z!MultiThreadedTestCase.join_or_run&  r  r6   r  r  r   Nc                     |dk7  r|}t         |   |       	 t        | |      }t        | || j	                  |             y # t
        $ r+}|dk7  rt        d| j                   d|       |Y d }~y d }~ww xY wr  r  r  s        r7   r  zMultiThreadedTestCase.__init__0  r  r  c                      y rb   r5   r  s    r7   perThreadSetUpz$MultiThreadedTestCase.perThreadSetUpC  s    r6   c                      y rb   r5   r  s    r7   perThreadTearDownz'MultiThreadedTestCase.perThreadTearDownG  s    r6   c                 x    t         |           | j                  | _        g | _        dt
        j                  d<   y)z
        setUp only set up things in the main thread, if you want to configure things
        in the spawned threads, use perThreadSetUp
        r   r  N)r  r  r  r_  r  rm   rn   rG  s    r7   r  zMultiThreadedTestCase.setUpJ  s1    
 	))	36

/0r6   c                 0    t         |           g | _        y)z
        tearDown only set up things in the main thread, if you want to configure things
        in the spawned threads, use perThreadTearDown
        N)r  r  r  rG  s    r7   r  zMultiThreadedTestCase.tearDownU  s    
 	r6   c                    t         j                  j                  j                  d       | j                  }t               t        j                         | j                  _	        fd} |       st        d      t        | j                        D ]e  }t        j                  | j                  j                  ||| j                  f      }|j!                          | j"                  j%                  |       g y)zk
        class method to spawn threads and run test, use this method in the SetUp of your TestCase
        Tc                  >     t         j                  j                  k(  S rb   r  r  s   r7   r  z<MultiThreadedTestCase._spawn_threads.<locals>.world_is_validg  r  r6   zInvalid worldr  N)ro   r  r  r  r   r"   r   r  r  r  r  rg  rs   r  r  r  r  r  rj  )r  r   r  r_  r  r  s        @r7   _spawn_threadsz$MultiThreadedTestCase._spawn_threads]  s     	""==dC++	$&&*nn&6#	9 //$//* 	#D  ~~**)T4??1SA GGILL"	#r6   c                     | |      }||_         t        |d      rWt        j                         |_        t
        j                  |j                  _        t
        j                  |j                  _	        |j                  |||       y )N_tls)r_  r   r  localr  r!   
_precision	precision_rel_tolrel_tolrun_test_with_threaded_pg)r   r   r_  rs   rr   r  s         r7   r  zMultiThreadedTestCase._runt  sb    9~	 4 !)DI"*"5"5DII ( 1 1DII&&y$
Cr6   c                    t        j                  d||| j                  j                         | j	                          	  t        | |              t        j                          | j                          y# t        $ rN}| j                  j                  |t        j                         f       t        j                  |       Y d}~wd}~ww xY w# t        j                          | j                          w xY w)zd
        Run the current test associated with `test_name` using the threaded process group.
        r  r  N)r   rR  r  r  r  r   r  r  r  rj   r  r$   r  r  r  )r  r   r_  rs   r  s        r7   r  z/MultiThreadedTestCase.run_test_with_threaded_pg  s     	!..--		
 			%$GD)$& &&(""$  	  $$dCLLN%;<.. 	 &&(""$s*   A5 5	C>ACC CC &C5c           
         t         }	 t        |      D ]f  \  }}|j                  t        d|             |j	                         s2t
        j                  j                  |t        t        d| d      d ff       h t        j                          g }| j                  j                         sF| j                  j                         }|j                  |       | j                  j                         sFt                t        j                   j"                  j%                  d       | j'                  |||       y # t                t        j                   j"                  j%                  d       w xY w)Nr   zRank failed to join in under r1  F)rT  r  r  maxis_aliver  r  r  TimeoutErrorr$   rg  emptyr   rj  r#   ro   r  r  r  r'  )r   r  r   r@  idxthreadfailed_ranksfailures           r7   r  z#MultiThreadedTestCase._join_threads  s-   !	I(1 VC7O,??$)99== , ,&CG9H$U!" !%	 ##%L))//1--113##G, ))//1 #$HH&&AA%Hgr: #$HH&&AA%Hs   <D9 B,D9 95E.c           	         d}d}|D ]0  \  }}|d   }t        |t        j                        r;t        j	                  d||t        |             |dk  sMt        d   j                  }at        |t              r)d| d| d	}	t        j                  |	       t        |	      t        |t              rEdj                  t        j                  |       }	t        j                  d
|	|       |d| d|	 dz  }t        |t              st!        |j"                        t$        u s|dk  s%|j"                  }3 t'        |      dkD  rt        |      |dkD  rqt        j)                         D ]Y  }
||
j                  k(  st*        r#t        j	                  d||
j,                          y t        j                  |
j,                         y y )Nr.  r   r   z3Thread %s skipping test %s for following reason: %sr   rM   zThread r0  z	 seconds
z'Caught exception: 
%s exiting thread %sz exited with exception:
r/  r2  )
isinstancer   r  r  r  r4   rl   r-   r  r   r  r   r  r  format_exception
SystemExitr1  coder2   r   rp  r   r.   )r   r  r@  r   	error_msg	skip_coder_  r  excr   r8  s              r7   r'  z)MultiThreadedTestCase._check_return_codes  s    		* 	)ND(1+C#x001IH	 q= *9 5 ? ?IC.v%DWIZXS!"3''C+ggi88(CDGdSwtf,EcU"MM	C,>S(Y] #I+	)0 y>Ay))q="))+ >.$T LL
 &//==> r6   c                     t         S rb   r  r  s    r7   rs   z MultiThreadedTestCase.world_size  r  r6   c                 F    | j                         j                  d      d   S rR  r  r  s    r7   r   z(MultiThreadedTestCase._current_test_name  s     wwys#B''r6   r   r;  c                J    | j                   |k(  r| j                  |||       yy)z
        The reason why we have this util function instead of
        self.assertEqual is all threads are sharing one CPU RNG
        so the assertion result is only reliable on rank 0
        N)r_  r4  r  r   yr   r_  s        r7   assertEqualOnRankz'MultiThreadedTestCase.assertEqualOnRank  s'     99Q3' r6   c                H    | j                   |k(  r| j                  ||       y y rb   )r_  assertNotEqualr  s        r7   assertNotEqualOnRankz*MultiThreadedTestCase.assertNotEqualOnRank  s#    991% r6   r<  r=  rb   )r/   r0   r1   __doc__queueQueuer  r  r  r4   r  r  r  r  r  r  rA  r  r  r  r'  r?  r2   rs   r   r  r  rB  rC  s   @r7   r  r    s     "ekkmO/ ?H8;	&	7#. D D%. ; ;: /> />b "C " " (C ( (( (&1 & &r6   r  c                        e Zd Zdeej
                  ej                  f   deddf fdZ	dej                  dej                  fdZ
 xZS )SaveForwardInputsModuleforward_inputscast_forward_inputsr   Nc                 t    t         |           t        j                  dd      | _        || _        || _        y )Nd   )r  r  nnLinearlr  r  r  r  r  r  s      r7   r  z SaveForwardInputsModule.__init__  s2    
 	3$,#6 r6   r   c                     || j                   | <   | j                  | j                  r3|j                  | j                  j                  j
                              S |      S rb   )r  r  r  toweightdtyper  r   s     r7   forwardzSaveForwardInputsModule.forward  sI    $%D!vv43K3Kadd466==../SSQRSSr6   r/   r0   r1   rc  r  Modulero   Tensorr>  r  r  rB  rC  s   @r7   r  r    sT    7RYY457 "7 
	7T T%,, Tr6   r  c                        e Zd Zdeej
                  ej                  f   deddf fdZ	dej                  dej                  fdZ
 xZS )SaveForwardInputsModelr  r  r   Nc                 t    t         |           t        ||      | _        t        ||      | _        || _        y rb   )r  r  r  c1c2r  r  s      r7   r  zSaveForwardInputsModel.__init__  s6    
 	).:MN).:MN,r6   r   c                 `    || j                   | <   | j                  | j                  |            S rb   )r  r  r  r  s     r7   r  zSaveForwardInputsModel.forward  s)    $%D!wwtwwqz""r6   r  rC  s   @r7   r  r    sQ    -RYY45- "- 
	-# #%,, #r6   r  c              #   X  K   |st         j                  j                  |        t         j                  j                         x}r|j                  nd}|t        j                  |      }dt        j                  d<   dt        j                  d<   |rp|rVt         j                  j                  j                  j                  j                         }t        j                  d|| |       nt        j                  || |       t         j                  j!                          t         j                  j"                  j$                  j'                          	 d  t         j                  j!                          t         j                  j"                  j$                  j'                          |rt        j(                          y y # t         j                  j!                          t         j                  j"                  j$                  j'                          |rt        j(                          w w xY ww)	Ncpur4  MASTER_ADDR6789MASTER_PORTfakerN  )r   r_  rs   )ro   rS  rT  current_acceleratorr1  r   get_default_backend_for_devicerm   rn   testing	_internalrI  r  	FakeStorerR  rf  rg  utilscountersclearr  )r_  rs   r   init_pgr  accdevice_typerO  s           r7   _dynamo_dist_per_rank_initr  "  s     **40 "--AACCSC%  55kB +BJJ} &BJJ}MM++77??IIKE##%	 ##G$:V	MM	MM  &&()$$**,&&(  	$$**,&&( s    EH*F> A(H*>A)H''H*c                   @     e Zd ZdZe fd       Ze fd       Z xZS )#DynamoDistributedSingleProcTestCasez
    Test harness for single-process dynamo distributed tests,
    initializes dist process group.

    Prefer this for simple tests, as it's easier to debug.
    c                    t         |           | j                  j                  t	        j
                  t        j                  ddd             d| _        t        j                  j                         j                  }| d| j                   | _        || j                  v rd n| j                  g| _        t        j                   t        j"                  |      | j                  d       y )Nr4  12355)r  r  r   r  r   )r_  rs   )r  
setUpClass_exit_stackenter_contextr   rc  rm   rn   r_  ro   rS  r  r1  rd   
device_idsr   rR  r  )r   rd   r  s     r7   r  z.DynamoDistributedSingleProcTestCase.setUpClassQ  s    %%JJ

#.#*	
 ""668==xq
+
!'3::!5CHH://7chhST	
r6   c                 J    t        j                          t        |           y rb   )r   r  r  tearDownClass)r   r  s    r7   r  z1DynamoDistributedSingleProcTestCase.tearDownClassf  s    ""$r6   )r/   r0   r1   r  rA  r  r  rB  rC  s   @r7   r   r   I  s0     
 
(    r6   r   c            	       H    e Zd ZdZedefd       Zededededdfd       Z	y)	"DynamoDistributedMultiProcTestCasea   
    Use this for tests that actually run on multiple GPUs.

    Decorate tests with @skip_if_lt_x_gpu(ngpu)

    Note: MultiProcTestCase spawns processes per test and is slow.
    Prefer MultiThreadedTestCase for most tests. Perhaps use this one
    sparingly for integration tests.
    r   c                 >    t         j                  j                         S rb   )ro   rS  rp   r  s    r7   rs   z-DynamoDistributedMultiProcTestCase.world_sizew  s      --//r6   r_  r   r  Nc                     t        j                  t        j                                 | |      }||_        ||_        |j                  ||       y rb   )r   
addHandlerloggingNullHandlerr_  r  r  r  s          r7   r  z'DynamoDistributedMultiProcTestCase._run{  sB     	W0023 9~	"i-r6   )
r/   r0   r1   r  r?  r2   rs   rA  r4   r  r5   r6   r7   r
  r
  l  sV     0C 0 0 	.	.#&	.36	.		. 	.r6   r
  c                   Z    e Zd ZU dZdZeed<   dZeed<   dZe	e
   ed<    ed      Zeed	<   d
Zeed<   ede	e
   fd       Zede
fd       Zedd       Zed        Zede
ddfd       Zed        Zedd       Ze fd       Ze fd       Zd fdZd Z	 dde
de
ddf fdZ xZS )MultiProcContinuousTestr   rs   r_  N	rdvz_filex   )secondsr@  Fpoison_pillr   c                      y)z
        ProcessGroup backend str.
        To be customized by sub test classes, e.g. "nccl".
        Otherwise we return None -- lazily decided by tensor.
        Nr5   )r   s    r7   backend_strz#MultiProcContinuousTest.backend_str       r6   c                 \    t         j                  j                         }|y|j                  S )Nr  )ro   rS  r  r1  )r   curr_devices     r7   r  z#MultiProcContinuousTest.device_type  s+    '';;=r6   c                      y)z
        ProcessGroup init options.
        To be customized by sub test classes, e.g. ProcessGroupNCCLOpTest
        Here we return None.
        Nr5   )r   high_priority_streams     r7   optszMultiProcContinuousTest.opts  r  r6   c                 8   |J t        |      t        j                  d<   t        j                  ||      }t        j
                  | j                         |||| j                         | j                         t        j                  j                         | _        y )N
LOCAL_RANK)r   rs   r_  rO  
pg_optionsr@  )r4   rm   rn   r   rQ  rR  r  r  r@  rU  rV  pg)r   r_  rs   r  rO  s        r7   _init_pgz MultiProcContinuousTest._init_pg  s{    $$$ $'t9

< y*5OO%!xxzKK	
 &&99;r6   rU  c                     |j                  dd      d   } | |      }| j                  |_        | j                  |_        t        ||      }t	        j
                           |di | y )Nr$  r   r!  r   r5   )rsplitr_  rs   r   r   r
  )r   rU  rr   r   r  r\  s         r7   _run_test_given_idz*MultiProcContinuousTest._run_test_given_id  sa     NN3N3B7	9~HH	..$	* 	!!# 	&r6   c                 h   d}d|cxk  r|k  sJ  J || _         || _        | j                  |||       t        j	                  d       	 |j                         }t        j	                  d|        |n$	 | j                  |       |j                  |       Ot        j	                  d
       |st3        j4                          y y # t        $ r}t        |t              rit        |dd       t        fdt        j                         D        d       }	|	r3|j                  t        j                   |	j"                               Y d }~d}t%        j&                         }
dj)                  t+        j,                  |
       }t/        d	|       }||_        |j                  |       Y d }~d }~ww xY w)NFr   zSetup completeTz	Got test r  c              3   B   K   | ]  }|j                   k(  s|  y wrb   )r-   )r   vr-   s     r7   r  z7MultiProcContinuousTest._worker_loop.<locals>.<genexpr>  s     Tq1;;);STs   r.  zException in worker process:
zTerminating ...)r_  rs   r#  r  r  r   r&  r  r  r  r  r   nextrl   rp  r   r  r.   rj   r  r  r  r  r  	__cause__r   r  )r   r_  rs   r  
task_queuecompletion_queueraised_exceptionrU  r  
skip_entryr  tb_strenhanced_exr-   s                @r7   _worker_loopz$MultiProcContinuousTest._worker_loop  s    D%:%%%%%# 	T:y1 	%&  nn&GLL9WI./2&&w/ $$W- L 	&'
  &&(  A ! 2b*- 'FD 9I "&TJ$5$5$7T"J "(,,X->->z?Q?Q-RS #' <<>!;!;X!FG*-KF8+TU(*% $$[1112s    7"C 	F1A4F,	AF,,F1c                 J   g | _         g | _        g | _        t        j                  d      5 }|j
                  | _        d d d        	 t        j                  j                  d       t        t        |            D ]	  }t        j                  j                         }t        j                  j                         }t        j                  j                  | j                  dt!        |      z   d||| j                  ||f      }|j#                          | j                   j%                  |       | j                  j%                  |       | j                  j%                  |       t&        j)                  d||j*                          y # 1 sw Y   LxY w# t        $ r Y ;w xY w)NFr  r  r  T)r  r  r  rq   r  )r  task_queuescompletion_queuesr  r  r  r  ro   r  r  r  rg  r2   r  r  r2  r4   r  rj  r  r  r  )r   rs   r   r_  r,  r-  r  s          r7   r  z(MultiProcContinuousTest._spawn_processes  sX    "((6 	#!FFCM	#
	!!227;
 #j/* 	ND..446J$44::<++33''#d)+JzCST	 4 G MMOMM  )OO"":.!!(()9:LL94M	N	# 	#  		s   FF F	F"!F"c                    t         |           | j                         }| j                  dk(  rPt	        j
                  |      j                         | _        | j                  dk(  rt        j                  d| d      t        j                  d| j                   d| j                   d|        | j                  | j                         y)	z
        Class-scope test fixture. Run once for entire test class, before any test starts.
        Set up the process group.
        r  r   zNo z devices availablezTesting class z on  N)r  r  r  rs   ro   rP  rp   r   r  r  r  r/   r  )r   r  r  s     r7   r  z"MultiProcContinuousTest.setUpClass1  s     	 oo'>>R"44[ANNPCN~~"''#k]:L(MNNS\\N$s~~.>a}M	
 	S^^,r6   c                    t         j                  d| j                   d       | j                  D ]  }|j	                  d        | j
                  D ]  }|j                           	 t        j                  | j                         t         j                  d| j                   d       t        | 9          y# t        $ r Y =w xY w)z
        Class-scope test fixture. Run once for entire test class, after all tests finish.
        Tear down the process group.
        zJoining z workersNzClass z	 finished)r  r  rs   r4  r  r  r  rm   rJ  r  rK  r  r/   r  r  )r   r,  r  r  s      r7   r  z%MultiProcContinuousTest.tearDownClassG  s     	x/x89// 	!JNN4 	! }} 	GLLN		IIcmm$ 	fS\\N)45	  		s   )B: :	CCc                    t         |           | j                  | _        | j                  j
                  r&t        j                  d| j                                t        | j                        D ]M  \  }}t        j                  d| d| j                                 |j                  | j                                O y)z5
        Test fixture. Run before each test.
        zPrevious test failed, skipping zSending Rank r  N)r  r  r  r_  r  r  r   r  r  r  r4  r  r  r  )r  rw  r,  r  s      r7   r  zMultiProcContinuousTest.setUp_  s     	 **	 >>%%##&Edggi[$QRR 't'7'78 	&MAzLL=2dggi[9:NN4779%	&r6   c                 V    t              fd       }t        j                  ||       S )Nc           	      j   | j                   | j                  k(  rt        j                  d| j	                                 t        | j                        D ]  \  }}|j                         }t        |t        j                        r|t        |t              rSt        j                  d| d| j	                          d| j                  j                          d| j                  _        ||| j	                         k(  sJ t        j                  d| d| j	                                  y          y )NzWaiting for workers to finish zDetected failure from Rank z in: z(, skipping rest of tests in Test class: TzMain proc detected rank z
 finished )r_  r  r  r  r  r  r5  r   r  r   r  r  r3  r  r/   r  )r  rw  r-  rvr   s       r7   ru   z>MultiProcContinuousTest._worker_run_main_wait.<locals>.wrapperr  s   yyD222=dggi[IJ+4T5K5K+L 'A')--/B!"h&7&78 !"m49!E$'') MEEI^^E\E\D]_ 6:2  ?*?LL21#Z	{K#, r6   r  r  s    ` r7   _worker_run_main_waitz-MultiProcContinuousTest._worker_run_main_waitq  s.    	r	 
	8 ..r6   r  r  c                     |dk7  r|}t         |   |       	 t        | |      }t        | || j	                  |             y # t
        $ r+}|dk7  rt        d| j                   d|       |Y d }~y d }~ww xY wr  )r  r  r   r  r=  r  r  r  r  s        r7   r  z MultiProcContinuousTest.__init__  s    
 "$K%		{+BD+t'A'A"'EF 	Y& !-dnn-=R
|L '	r  )Fr=  r<  )r/   r0   r1   r  rs   r2   r3   r_  r  r   r4   r   r@  r  r>  rA  r  r  r  r#  r&  r2  r  r  r  r  r=  r  rB  rC  s   @r7   r  r    sK   JD#N#Ix}#"3/GY/KHSM    C       < <$  4   ;) ;)z N N> - -*    .&$/J ?H8;	 r6   r  rb   r   )r   r=  r   )r  	functoolsra  r  r  rs  rm   r  ru  rj   r  r  r"  r  r  r   collections.abcr   
contextlibr   dataclassesr   datetimer   enumr   r   r	   r
   ior   typingr   r   r   r   unittest.mockr   ro   torch._dynamo.test_casetorch.cuda.nccltorch.distributedrI  r   torch.nnr  torch._C._autogradr   torch._C._distributed_c10dr   torch._logging._internalr   torch.testing._internalr   $torch.testing._internal.common_utilsr   r   r   r   r   r   r   r   r   r   r   r    r!   5torch.testing._internal.distributed.multi_threaded_pgr"   r#   r$   	getLoggerr/   r  setLevelINFOr  rc   HAS_ACCELERATORr,   rl   rT   re   rx   r   r   r   r   r   r>  r   r   r2   r   r   r   r   r   r   r   r   r   r   r   r   r  r
  r  r  r3   r  r&  r4   r  r)  r-  rd   r3  rD  rT  getenvrS  rP  rV  r^  rx  r  r  r  r  r  r  r  rE  rc  rz  ro  cachery  r}  r  r  r  r  r  r  rf  	test_caser   r
  r  r5   r6   r7   <module>rY     st         	   
       $ % !   , ,  3 3        ) 7 . 0     
		8	$  4 E? 3x38z 
8
C x$FG	
 Xb"BC x45 8B => 8B >? 8B >? 8B >? 8B >? 8B >? 8B >? 8B >? HR>?  (267!" hr#LM#$ x
V%* 8B DE+, hr#BC-
4 * * **&6  &3 , s s ("$+\4
N: $+1$ D 
S
sCx *F Fc F# F$ F" 	a 
 
: O)"))$GOPO,c2  +.'(
T 
IC I 2 2(S (c (s (XS 3 2 26(--	. 5
Xc] 
d 
"  ~8 ~L+N. +N\d3i( 
 D   &0 
3E7tl&H l&^Tbii T #RYY #  :?#) #)L  %--*A*A*J*J   F.)< .8^h ^r6   