o
    Df                     @   sv  d Z ddlZddlZddlZddlmZ ddlmZmZm	Z	m
Z
 ddlZddlmZmZ ddlmZmZ ddlmZ ddlmZ ejd	d
Zedi dZedddhdZeddhdZG dd dejZeddeddddfddZeddededdfdedede de
e ef de	e  dede!d e"d!eej fd"d#Z#eddedfd$d%Z$dede
e ef de d!dfd&d'Z%dS )(z'Embedded workers for integration tests.    N)contextmanager)AnyIterableOptionalUnion)Celeryworker)_set_task_join_will_blockallow_join_result)Signal)anon_nodenameWORKER_LOGLEVELerrortest_worker_starting)nameproviding_argstest_worker_startedr   consumertest_worker_stoppedc                       sT   e Zd ZdZdZ fddZG dd dejjZ fddZ	d	d
 Z
dd Z  ZS )TestWorkControllerz3Worker that can synchronize on being fully started.Nc                    s   t  | _t j|i | | jjdd dkrPddlm	} | | _
t | _zddlm} |  W n	 ty=   Y nw tj| j
t | _| j  d S d S )N.preforkr   )Queue)pickling_support)	threadingEvent_on_startedsuper__init__pool_cls
__module__splitbilliardr   logger_queueosgetpidpidtblibr   installImportErrorlogginghandlersQueueListener	getLoggerqueue_listenerstart)selfargskwargsr   r   	__class__ V/home/ubuntu/webapp/venv/lib/python3.10/site-packages/celery/contrib/testing/worker.pyr   #   s   

zTestWorkController.__init__c                   @   s   e Zd Zdd Zdd ZdS )zTestWorkController.QueueHandlerc                 C   s
   d|_ |S )NT)
from_queuer1   recordr6   r6   r7   prepare:   s   z'TestWorkController.QueueHandler.preparec                 C   s   t jr d S )N)r+   raiseExceptionsr9   r6   r6   r7   handleError?   s   z+TestWorkController.QueueHandler.handleErrorN)__name__r!   __qualname__r;   r=   r6   r6   r6   r7   QueueHandler9   s    r@   c                    s@    j r  j }| fdd t }|| t  S )Nc                    s   | j  jkot| dd S )Nr8   F)processr'   getattr)rr1   r6   r7   <lambda>F   s    z*TestWorkController.start.<locals>.<lambda>)r$   r@   	addFilterr+   r.   
addHandlerr   r0   )r1   handlerloggerr4   rD   r7   r0   C   s   

zTestWorkController.startc                 C   s    | j   tj| j| |d dS )z=Callback called when the Consumer blueprint is fully started.)senderr   r   N)r   setr   sendapp)r1   r   r6   r6   r7   on_consumer_readyK   s   

z$TestWorkController.on_consumer_readyc                 C   s   | j   dS )zWait for worker to be fully up and running.

        Warning:
            Worker must be started within a thread for this to work,
            or it will block forever.
        N)r   waitrD   r6   r6   r7   ensure_startedR   s   z!TestWorkController.ensure_started)r>   r!   r?   __doc__r$   r   r+   r,   r@   r0   rN   rP   __classcell__r6   r6   r4   r7   r      s    
r      soloTg      $@c              
   k   s    t j| d d}	z]t| f||||||d|2}	|rAddlm}
 t  |
 j|ddks2J W d   n1 s<w   Y  |	V  W d   n1 sNw   Y  W tj| |	d dS W tj| |	d dS tj| |	d w )	z[Start embedded worker.

    Yields:
        celery.app.worker.Worker: worker instance.
    )rJ   N)concurrencypoolloglevellogfileperform_ping_checkshutdown_timeoutrS   )ping)timeoutpong)rJ   r   )	r   rL   _start_worker_threadtasksr[   r
   delaygetr   )rM   rU   rV   rW   rX   rY   ping_task_timeoutrZ   r3   r   r[   r6   r6   r7   start_worker]   s2   "rc   rM   rU   rV   rW   rX   WorkControllerrY   rZ   returnc                 k   s&   t | || |rd| jv sJ | jtjdd}	|	jj W d   n1 s)w   Y  |d| |t |||d|	ddddd
|}
t
j|
jdd}|  |
  td	 z|
V  W d
dlm} d
|_|| | rttdd|_dS d
dlm} d
|_|| | rtdd|_w )zaStart Celery worker in a thread.

    Yields:
        celery.worker.Worker: worker instance.
    zcelery.pingTEST_BROKER)hostnameNwithout_heartbeatT)
rM   rU   rg   rV   rW   rX   ready_callbackrh   without_minglewithout_gossip)targetdaemonFr   )statezWorker thread failed to exit within the allocated timeout. Consider raising `shutdown_timeout` if your tasks take longer to execute.r6   )setup_app_for_workerr_   
connectionr%   environra   default_channelqueue_declarer   popr   Threadr0   rP   r	   celery.workerrn   should_terminatejoinis_aliveRuntimeError)rM   rU   rV   rW   rX   rd   rY   rZ   r3   connr   trn   r6   r6   r7   r^      sV   




r^   c           	      k   sP    ddl m}m} |   ||dg}|  z
dV  W |  dS |  w )zfStart worker in separate process.

    Yields:
        celery.app.worker.Worker: worker instance.
    r   )ClusterNodeztestworker1@%hN)celery.apps.multir}   r~   set_currentr0   stopwait)	rM   rU   rV   rW   rX   r3   r}   r~   clusterr6   r6   r7   _start_worker_process   s   r   c                 C   s8   |    |   |   dt| j_| jj||d dS )z9Setup the app to be used for starting an embedded worker.F)rW   rX   N)finalizer   set_defaulttypelog_setupsetup)rM   rW   rX   r6   r6   r7   ro      s
   ro   )&rQ   r+   r%   r   
contextlibr   typingr   r   r   r   celery.worker.consumerceleryr   r   celery.resultr	   r
   celery.utils.dispatchr   celery.utils.nodenamesr   rq   ra   r   r   r   r   rd   r   rc   intstrboolfloatr^   r   ro   r6   r6   r6   r7   <module>   s    ?'
7&