o
    Df!                     @   s   d Z ddlZddlZddlmZ ddlmZ ddlmZ ddl	m
Z
mZmZmZmZmZ ddlmZ ddlmZ dd	lmZ dd
lmZ ddlmZmZ ddlmZ ddlmZ dZ eeddZG dd de!Z"G dd dZ#G dd de#Z$dS )zIntegration testing utilities.    N)defaultdict)partial)count)AnyCallableDictSequenceTextIOTuple)ContentDisallowedretry_over_timestates)TimeoutError)AsyncResult	ResultSet)truncate)humanize_secondsz4Still waiting for {0}.  Trying again {when}: {exc!r}T)microsecondsc                   @   s   e Zd ZdZdS )SentinelzSignifies the end of something.N)__name__
__module____qualname____doc__ r   r   W/home/ubuntu/webapp/venv/lib/python3.10/site-packages/celery/contrib/testing/manager.pyr      s    r   c                   @   s  e Zd ZdZ		d=ddZd>dd	Zd
d Z									d?ddZ			d@ddZdd Z	dAddZ
dBddZdCd d!ZdCd"d#Z		$dDd%d&Z		'dEd(d)Z		*dFd+d,Z	dCd-d.Zed/d0 ZdCd1d2Zd3d4 Zd5d6 ZdCd7d8Zd9d: Zd;d< ZdS )GManagerMixinz.Mixin that adds :class:`Manager` capabilities.      @FNc                 C   sF   |d u rt jn|| _|d u rt jn|| _| j j| _|| _|| _d S N)	sysstdoutstderrapp
connectionrecoverable_connection_errors
connerrorsblock_timeoutno_join)selfr'   r(   r!   r"   r   r   r   _init_manager   s
   
zManagerMixin._init_manager-c                 C   s   t | | | jd d S )N)file)printr!   )r)   ssepr   r   r   remark(   s   zManagerMixin.remarkc                 C   s   dd |D S )Nc                 S   s    g | ]}|j |jjvr|j qS r   )idbackend_cache).0resr   r   r   
<listcomp>.   s     z0ManagerMixin.missing_results.<locals>.<listcomp>r   )r)   rr   r   r   missing_results,   s   zManagerMixin.missing_resultsthingr   
   皙?      ?      @c              	      s@   |si n|} fdd}j ||f||||||	d|S )zWait for event to happen.

        The `catch` argument specifies the exception that means the event
        has not happened yet.
        c                    s>   t |}rtj t|dd| d r| || |S )Nin )whenexc)nextwarnE_STILL_WAITINGformatr   )rA   	intervalsretriesintervaldescemit_warningerrbackr)   r   r   on_errorG   s   z'ManagerMixin.wait_for.<locals>.on_error)argskwargsrL   max_retriesinterval_startinterval_stepr   )r)   funcatchrJ   rN   rO   rL   rP   rQ   rR   interval_maxrK   optionsrM   r   rI   r   wait_for0   s   
zManagerMixin.wait_for   {Gz?      ?c	           
   
   K   s2   z| j ||||||||dW S  |y   Y dS w )z;Make sure something does not happen (at least for a while).)rJ   rP   rQ   rR   rU   rK   zShould not have happened: N)rW   AssertionError)
r)   rS   rT   rJ   rP   rQ   rR   rU   rK   rV   r   r   r   ensure_not_for_a_whileY   s   z#ManagerMixin.ensure_not_for_a_whilec                 O   s   t |i |S r   r   )r)   rN   rO   r   r   r   r   j      zManagerMixin.retry_over_timec           	         s  | j rd S t|ts| j|g}g   fdd}|rt|ntdD ]d}g  d d < z|jd
||d|W   S  tjt	fyl } z$| 
|}| dt|t  t|td||d W Y d }~q#d }~w | jy } z| d|d W Y d }~q#d }~ww td	)Nc                    s     |  d S r   )append)task_idvaluereceivedr   r   	on_resultt   r]   z$ManagerMixin.join.<locals>.on_resultr   )callback	propagatez#Still waiting for {}/{}: [{}]: {!r}z, !zjoin: connection lost: z!Test failed: Missing task resultsr   )r(   
isinstancer   r#   ranger   getsockettimeoutr   r8   r0   rE   lenr   joinr&   r[   )	r)   r7   re   rP   rO   rc   irA   waiting_forr   ra   r   rm   m   s2   

zManagerMixin.join      @c                 C   s   | j jj|dS Nrk   )r#   controlinspect)r)   rk   r   r   r   rt      s   zManagerMixin.inspectc                 c   s(    |  |j| p
i }| E d H  d S r   )rt   
query_taskitems)r)   idsrk   tasksr   r   r   query_tasks   s   zManagerMixin.query_tasksc           	      C   sH   t t}| j||dD ]\}}| D ]\}\}}|| | qq|S rq   )r   setry   rv   add)	r)   rw   rk   r   hostnamereplyr_   state_r   r   r   query_task_states   s   zManagerMixin.query_task_states waiting for tasks to be acceptedc                 K      | j | j|f||d|S N)rH   rJ   )assert_task_worker_stateis_acceptedr)   rw   rH   rJ   policyr   r   r   assert_accepted      zManagerMixin.assert_accepted waiting for tasks to be receivedc                 K   r   r   )r   is_receivedr   r   r   r   assert_received   r   zManagerMixin.assert_received,waiting for tasks to be started or completedc                 K   r   r   )assert_task_state_from_resultis_result_task_in_progress)r)   async_resultsrH   rJ   r   r   r   r   ,assert_result_tasks_in_progress_or_completed   s   z9ManagerMixin.assert_result_tasks_in_progress_or_completedc                 K   $   | j t| j|||dtffi |S rq   rW   r   true_or_raiser   )r)   rS   resultsrH   r   r   r   r   r      s   z*ManagerMixin.assert_task_state_from_resultc                    s"   t jt jf t fdd| D S )Nc                 3   s    | ]}|j  v V  qd S r   )r~   )r4   resultpossible_statesr   r   	<genexpr>   s    z:ManagerMixin.is_result_task_in_progress.<locals>.<genexpr>)r   STARTEDSUCCESSall)r   rO   r   r   r   r      s   z'ManagerMixin.is_result_task_in_progressc                 K   r   rq   r   )r)   rS   rw   rH   r   r   r   r   r      s   z%ManagerMixin.assert_task_worker_statec                 K   s   | j g d|fi |S )N)reservedactiveready_ids_matches_stater)   rw   rO   r   r   r   r      s
   zManagerMixin.is_receivedc                 K   s   | j ddg|fi |S )Nr   r   r   r   r   r   r   r      s   zManagerMixin.is_acceptedc                    s&   | j ||dt fdd|D S )Nrr   c                 3   s4    | ] t  fd dfddD D V  qdS )c                 3   s    | ]} |v V  qd S r   r   )r4   r.   tr   r   r          z<ManagerMixin._ids_matches_state.<locals>.<genexpr>.<genexpr>c                    s   g | ]} | qS r   r   )r4   kr   r   r   r6      s    z=ManagerMixin._ids_matches_state.<locals>.<genexpr>.<listcomp>N)any)r4   expected_statesr   r   r   r      s
    "
z2ManagerMixin._ids_matches_state.<locals>.<genexpr>)r   r   )r)   r   rw   rk   r   r   r   r      s   zManagerMixin._ids_matches_statec                 O   s   ||i |}|st  |S r   )r   )r)   rS   rN   rO   r5   r   r   r   r      s   zManagerMixin.true_or_raisec              	   C   s   | j j}| j  E}	 |j|d}|dkrnq| }||_	 ztdd |  D }W n	 ty7   Y nw |dkr=n	qW d    d S W d    d S 1 sQw   Y  d S )NT)r$   r   c                 s   s    | ]}t |V  qd S r   )rl   )r4   r   r   r   r   r      r   z/ManagerMixin.wait_until_idle.<locals>.<genexpr>)	r#   rs   r$   purgert   sumr   valuesr   )r)   rs   r$   r   rt   r   r   r   wait_until_idle   s,   "zManagerMixin.wait_until_idle)r   FNN)r+   )	r9   r   NNr:   r;   r<   r=   F)r9   rX   r;   rY   rZ   F)Fr:   )rp   )r<   )r<   r   )r<   r   )r<   r   )r   r   r   r   r*   r0   r8   rW   r\   r   rm   rt   ry   r   r   r   r   r   staticmethodr   r   r   r   r   r   r   r   r   r   r   r      sZ    



*






	




r   c                   @   s   e Zd ZdZdd ZdS )Managerz(Test helpers for task integration tests.c                 K   s   || _ | jdi | d S )Nr   )r#   r*   )r)   r#   rO   r   r   r   __init__   s   zManager.__init__N)r   r   r   r   r   r   r   r   r   r      s    r   )%r   rj   r    collectionsr   	functoolsr   	itertoolsr   typingr   r   r   r   r	   r
   kombu.exceptionsr   kombu.utils.functionalr   celeryr   celery.exceptionsr   celery.resultr   r   celery.utils.textr   celery.utils.timer   _humanize_secondsrD   	Exceptionr   r   r   r   r   r   r   <module>   s(      P