o
    DfM                     @   s  d Z ddlZddlZddlmZmZmZ ddlmZ ddl	m
Z
 ddlmZ ddlmZ ddlmZ dd	lmZ dd
lmZmZ ddlmZ ddlmZ ddlmZ dZdZee Z!edg dZ"dd Z#dd Z$G dd deZ%dd Z&dd Z'e' dd Z(e'dd d!efgd"dd$d%Z)d&d' Z*e'd(d)d*d+d, Z+ej,j-fd-d.Z.ej/j0ej1j0fd/d0Z2e&d1d)d*dd2d3Z3e&d4d5d*dd6d7Z4dd8d9Z5e&d1d:e6fgd;d<d=d> Z7e&d?e6fd@e6fgdAdBdCd@ Z8e&d?e6fdDe9fdEe9fgdFdBddGdHZ:e' dIdJ Z;e& ddKdLZ<e& dMdN Z=e& dOdP Z>e& dQdR Z?e'd#dSddTdUZ@e'dVdWdXdY ZAe' dZd[ ZBe'd\d]d^d_ ZCd`da ZDe'dbd]dcdd ZEe'ded]ddfdgZFe'dhd]didj ZGe'dkdldmdnddodpZHe'dqdre6fdseIfdteIfgdudvddzd{ZJe' d|d} ZKe'd~eIfgddBdddZLe&deIfgddBdddZMe&deIfgddBdddZNe& dddZOe&deIfdeIfgddBdddZPe& dddZQe&de6fde6fde6fde6fgddB		dddZRe&de6fgddBdd ZSe' dd ZTdS )z.Worker remote control command implementations.    N)UserDictdefaultdict
namedtuple)TERM_SIGNAME)	safe_repr)WorkerShutdown)signals)
maybe_list)
get_logger)jsonify	strtobool)rate   state)Request)Panel)exchangerouting_key
rate_limitcontroller_info_t)aliastypevisibledefault_timeouthelp	signatureargsvariadicc                 C      d| iS )Nok valuer!   r!   N/home/ubuntu/webapp/venv/lib/python3.10/site-packages/celery/worker/control.pyr          r    c                 C   r   )Nerrorr!   r"   r!   r!   r$   nok"   r%   r'   c                   @   s8   e Zd ZdZi Zi Zedd Ze			d
dd	ZdS )r   z+Global registry of remote control commands.c                 O   s(   |r| j di || S | j di |S )Nr!   )	_register)clsr   kwargsr!   r!   r$   register,   s   zPanel.registerNcontrolT      ?c
              
      s"    	f
dd}
|
S )Nc              	      s^   p| j }p| jpd dd }| j|< t 	|j|<  r-| j < | S )N 
r   )__name____doc__stripsplitdatar   meta)funcontrol_name_help
r   r   r)   r   r   namer   r   r   r   r!   r$   _inner7   s   



zPanel._register.<locals>._innerr!   )r)   r:   r   r   r   r   r   r   r   r   r;   r!   r9   r$   r(   2   s   
zPanel._register)	NNr,   Tr-   NNNN)	r0   
__module____qualname__r1   r4   r5   classmethodr+   r(   r!   r!   r!   r$   r   &   s    
r   c                  K      t jdddi| S )Nr   r,   r!   r   r+   r*   r!   r!   r$   control_commandD      rB   c                  K   r?   )Nr   inspectr!   r@   rA   r!   r!   r$   inspect_commandH   rC   rE   c                 C   s   t | j S )z6Information about Celery installation for bug reports.)r    app	bugreportr   r!   r!   r$   reportN      rH   	dump_confz[include_defaults=False]with_defaults)r   r   r   Fc                 K   s   t | jjj|dttdS )zList configuration.)rK   )	keyfilterunknown_type_filter)r   rF   conftable_wanted_config_keyr   )r   rK   r*   r!   r!   r$   rN   T   s   rN   c                 C   s   t | to
| d S )N__)
isinstancestr
startswith)keyr!   r!   r$   rP   `   s   rP   idsz[id1 [id2 [... [idN]]]])r   r   c                 K   s   dd t t|D S )z!Query for task information by id.c                 S   s    i | ]}|j t|| fqS r!   )id_state_of_taskinfo).0reqr!   r!   r$   
<dictcomp>l   s    zquery_task.<locals>.<dictcomp>)_find_requests_by_idr	   )r   rV   r*   r!   r!   r$   
query_taskf   s   
r^   c              	   c   s0    | D ]}z||V  W q t y   Y qw d S N)KeyError)rV   get_requesttask_idr!   r!   r$   r]   r   s   r]   c                 C   s   || rdS || rdS dS )Nactivereservedreadyr!   )request	is_activeis_reservedr!   r!   r$   rX   {   s
   rX   rb   c                 K   sR   t t|pg d}}t| |||fi |}t|tr!d|v r!|S td| dS )zRevoke task by task id (or list of ids).

    Keyword Arguments:
        terminate (bool): Also terminate the process if the task is active.
        signal (str): Name of signal to use for terminate (e.g., ``KILL``).
    Nr    ztasks z flagged as revoked)setr	   _revokerR   dictr    )r   rb   	terminatesignalr*   task_idsr!   r!   r$   revoke   s
   ro   headersz/[key1=value1 [key2=value2 [... [keyN=valueN]]]]c                 K   s,  t |pt}t|trdd |D }| D ]\}}ttj	|p#g tt| }|tj|< q|s;t
d| dS ttj}	tt}
|	D ]=}t|dr|jr| D ].\}}||jv rt|}t|j| }t|t|@ }|r|
| | |j| jj|d qTqF|
st
d| dS t
d|
 dS )	a  Revoke task by header (or list of headers).

    Keyword Arguments:
        headers(dictionary): Dictionary that contains stamping scheme name as keys and stamps as values.
                             If headers is a list, it will be converted to a dictionary.
        terminate (bool): Also terminate the process if the task is active.
        signal (str): Name of signal to use for terminate (e.g., ``KILL``).
    Sample headers input:
        {'mtask_id': [id1, id2, id3]}
    c                 S   s&   i | ]}| d d | d d qS )=r   r   )r3   )rZ   hr!   r!   r$   r\      s   & z-revoke_by_stamped_headers.<locals>.<dictcomp>zheaders z' flagged as revoked, but not terminatedstampsrm   z were not terminatedz revoked)_signalssignumr   rR   listitemsr	   worker_staterevoked_stampsgetr    active_requestsr   ri   hasattrrs   updaterl   consumerpool)r   rp   rl   rm   r*   rv   headerrs   updated_stampsr|   #terminated_scheme_to_stamps_mappingr[   expected_header_keyexpected_header_valueactual_headermatching_stamps_for_requestr!   r!   r$   revoke_by_stamped_headers   s0   
 

r   c           
      K   s   t |}t }tj| |rQt|pt}t|D ]&}|j	|vr@|
|j	 td|j	| |j| jj|d t ||kr@ nq|sGtdS tdd|S d|}	td|	 |S )NzTerminating %s (%s)rt   zterminate: tasks unknownzterminate: {}z, zTasks flagged as revoked: %s)lenri   ry   revokedr~   ru   rv   r   r]   rW   addloggerrY   rl   r   r   r    formatjoin)
r   rn   rl   rm   r*   size
terminatedrv   rf   idstrr!   r!   r$   rj      s&   

rj   rm   z <signal> [id1 [id2 [... [idN]]]])r   r   r   c                 K   s   t | |d|dS )z+Terminate task by task id (or list of ids).T)rl   rm   )ro   )r   rm   rb   r*   r!   r!   r$   rl      s   rl   	task_namer   z0<task_name> <rate_limit (e.g., 5/s | 5/m | 5/h)>)r   r   c              
   K   s   zt | W n ty } ztd|W  Y d}~S d}~ww z	|| jj| _W n ty>   tjd|dd td Y S w | j	
  |sPtd| tdS td	|| td
S )zTell worker(s) to modify the rate limit for a task by type.

    See Also:
        :attr:`celery.app.task.Task.rate_limit`.

    Arguments:
        task_name (str): Type of task to set rate limit for.
        rate_limit (int, str): New rate limit.
    zInvalid rate limit string: Nz&Rate limit attempt for unknown task %sTexc_infounknown taskz)Rate limits disabled for tasks of type %sz rate limit disabled successfullyz(New rate limit for tasks of type %s: %s.znew rate limit set successfully)r   
ValueErrorr'   rF   tasksr   r`   r   r&   r   reset_rate_limitsrY   r    )r   r   r   r*   excr!   r!   r$   r      s,   
softhardz#<task_name> <soft_secs> [hard_secs]c                 K   s`   z| j j| }W n ty   tjd|dd td Y S w ||_||_td||| t	dS )zTell worker(s) to modify the time limit for task by type.

    Arguments:
        task_name (str): Name of task to change.
        hard (float): Hard time limit.
        soft (float): Soft time limit.
    z-Change time limit attempt for unknown task %sTr   r   z5New time limits for tasks of type %s: soft=%s hard=%sztime limits set successfully)
rF   r   r`   r   r&   r'   soft_time_limit
time_limitrY   r    )r   r   r   r   r*   taskr!   r!   r$   r     s   r   c                 K   s   d| j jjiS )z Get current logical clock value.clock)rF   r   r#   r   r*   r!   r!   r$   r   =  rI   r   c                 K   s"   | j jr| j j||| dS dS )zHold election.

    Arguments:
        id (str): Unique election id.
        topic (str): Election topic.
        action (str): Action to take for elected actor.
    N)r   gossipelection)r   rW   topicactionr*   r!   r!   r$   r   C  s   	r   c                 C   s>   | j j}|jrd|jvr|jd td tdS tdS )z+Tell worker(s) to send task-related events.r   z)Events of group {task} enabled by remote.ztask events enabledztask events already enabled)r   event_dispatchergroupsr   r   rY   r    r   
dispatcherr!   r!   r$   enable_eventsP  s   
r   c                 C   s8   | j j}d|jv r|jd td tdS tdS )z3Tell worker(s) to stop sending task-related events.r   z*Events of group {task} disabled by remote.ztask events disabledztask events already disabled)r   r   r   discardr   rY   r    r   r!   r!   r$   disable_events[  s   

r   c                 C   s,   t d | jj}|jddditj dS )z3Tell worker(s) to send event heartbeat immediately.zHeartbeat requested by remote.worker-heartbeatfreq   N)r   )r   debugr   r   sendry   SOFTWARE_INFOr   r!   r!   r$   	heartbeatf  s   
r   )r   c                 K   sJ   || j kr#td| |rtj| tj  tjj| jj	
 dS dS )zRequest mingle sync-data.zsync with %s)r   r   N)hostnamer   rY   ry   r   r~   purge_datarF   r   forward)r   	from_noder   r*   r!   r!   r$   hellop  s   


r   g?)r   c                 K   s   t dS )zPing worker(s).pong)r    r   r!   r!   r$   ping  s   r   c                 K   s   | j j S )z&Request worker statistics/information.)r   
controllerstatsr   r!   r!   r$   r     s   r   dump_schedule)r   c                 K   s   t t| jjS )z0List of currently scheduled ETA/countdown tasks.)rw   _iter_schedule_requestsr   timerr   r!   r!   r$   	scheduled  s   r   c              
   c   sj    | j jD ]-}z|jjd }W n ttfy   Y qw t|tr2|jr(|j	 nd |j
| dV  qd S )Nr   )etapriorityrf   )schedulequeueentryr   
IndexError	TypeErrorrR   r   r   	isoformatr   rY   )r   waitingarg0r!   r!   r$   r     s   
r   dump_reservedc                 K   s.   |  tj|  tj }|sg S dd |D S )zAList of currently reserved tasks, not including scheduled/active.c                 S   s   g | ]}|  qS r!   rY   rZ   rf   r!   r!   r$   
<listcomp>  s    zreserved.<locals>.<listcomp>)tsetry   reserved_requestsr|   )r   r*   reserved_tasksr!   r!   r$   rd     s   

rd   dump_activec                    s    fdd|  tjD S )z'List of tasks currently being executed.c                    s   g | ]}|j  d qS )safer   r   r   r!   r$   r     s    zactive.<locals>.<listcomp>)r   ry   r|   )r   r   r*   r!   r   r$   rc     s   

rc   dump_revokedc                 K   s
   t tjS )zList of revoked task-ids.)rw   ry   r   r   r!   r!   r$   r     s   
r   
dump_taskstaskinfoitemsz[attr1 [attr2 [... [attrN]]]])r   r   r   c                    sJ   | j jpt|rndd D }fdd  fddt|D S )zList of registered tasks.

    Arguments:
        taskinfoitems (Sequence[str]): List of task attributes to include.
            Defaults to ``exchange,routing_key,rate_limit``.
        builtins (bool): Also include built-in tasks.
    c                 s   s    | ]
}| d s|V  qdS )zcelery.N)rT   rZ   r   r!   r!   r$   	<genexpr>  s    

zregistered.<locals>.<genexpr>c                    sB    fddD }|rdd |  D }d jd|S  jS )Nc                    s.   i | ]}t  |d d ur|tt  |d qS r_   )getattrrS   )rZ   fieldr   r!   r$   r\     s
    z5registered.<locals>._extract_info.<locals>.<dictcomp>c                 S   s   g | ]}d  |qS )rq   )r   )rZ   fr!   r!   r$   r     s    z5registered.<locals>._extract_info.<locals>.<listcomp>z{} [{}] )rx   r   r:   r   )r   fieldsrY   )r   r   r$   _extract_info  s   
z!registered.<locals>._extract_infoc                    s   g | ]} | qS r!   r!   r   )r   regr!   r$   r     s    zregistered.<locals>.<listcomp>)rF   r   DEFAULT_TASK_INFO_ITEMSsorted)r   r   builtinsr*   r   r!   )r   r   r   r$   
registered  s   
r   g      N@r   num	max_depthz.[object_type=Request] [num=200 [max_depth=10]])r   r   r      
   r   c                    s   zddl }W n ty   tdw td| tjdddd$}||d|  |j | fd	d
|jd d|jiW  d   S 1 sGw   Y  dS )a  Create graph of uncollected objects (memory-leak debugging).

    Arguments:
        num (int): Max number of objects to graph.
        max_depth (int): Traverse at most n levels deep.
        type (str): Name of object to graph.  Default is ``"Request"``.
    r   NzRequires the objgraph libraryzDumping graph for type %rcobjgz.pngF)prefixsuffixdeletec                    s   |  v S r_   r!   )vobjectsr!   r$   <lambda>  s    zobjgraph.<locals>.<lambda>)r   	highlightfilenamer   )	objgraphImportErrorr   rY   tempfileNamedTemporaryFileby_typeshow_backrefsr:   )r   r   r   r   	_objgraphfhr!   r   r$   r     s$   $r   c                 K   s   ddl m} | S )z Sample current RSS memory usage.r   )
sample_mem)celery.utils.debugr   )r   r*   r   r!   r!   r$   	memsample  s   r   samplesz[n_samples=10]c                 K   s(   ddl m} t }|j|d | S )z/Dump statistics of previous memsample requests.r   )r   )file)celery.utilsr   ioStringIOmemdumpgetvalue)r   r   r*   r   outr!   r!   r$   r    s   r  nz[N=1]c                 K   s4   | j jjr	tdS | j j| | j | tdS )z!Grow pool by n processes/threads.zJpool_grow is not supported with autoscale. Adjust autoscale range instead.zpool will grow)r   r   
autoscalerr'   r   grow_update_prefetch_countr    r   r  r*   r!   r!   r$   	pool_grow  s
   
r  c                 K   s6   | j jjr	tdS | j j| | j |  tdS )z#Shrink pool by n processes/threads.zLpool_shrink is not supported with autoscale. Adjust autoscale range instead.zpool will shrink)r   r   r  r'   r   shrinkr
  r    r  r!   r!   r$   pool_shrink  s
   
r  c                 K   s.   | j jjr| jjj|||d tdS td)zRestart execution pool.)reloaderzreload startedzPool restarts not enabled)rF   rN   worker_pool_restartsr   r   reloadr    r   )r   modulesr  r  r*   r!   r!   r$   pool_restart,  s   
r  maxminz[max [min]]c                 C   s:   | j jj}|r|||\}}td| d| S td)zModify autoscale settings.zautoscale now max=z min=zAutoscale not enabled)r   r   r  r~   r    r   )r   r  r  r  max_min_r!   r!   r$   	autoscale6  s
   
r  Got shutdown from remotec                 K   s   t | td)zShutdown worker(s).r   )r   warningr   )r   msgr*   r!   r!   r$   shutdownC  s   
r  r   r   exchange_typer   z'<queue> [exchange [type [routing_key]]]c                 K   s2   | j j| j j|||pd|fi | td| S )z2Tell worker(s) to consume from task queue by name.directzadd consumer )r   	call_soonadd_task_queuer    )r   r   r   r  r   optionsr!   r!   r$   add_consumerL  s   r"  z<queue>c                 K   s    | j | j j| td| S )z9Tell worker(s) to stop consuming from task queue by name.zno longer consuming from )r   r  cancel_task_queuer    )r   r   _r!   r!   r$   cancel_consumer^  s   r%  c                 C   s    | j jrdd | j jjD S g S )z:List the task queues a worker is currently consuming from.c                 S   s   g | ]
}t |jd dqS )T)recurse)rk   as_dict)rZ   r   r!   r!   r$   r   n  s    z!active_queues.<locals>.<listcomp>)r   task_consumerqueuesr   r!   r!   r$   active_queuesj  s
   r*  )F)FN)NNNr_   )NF)r   r   r   )r   )r   )NFN)NN)r  )Ur1   r  r   collectionsr   r   r   billiard.commonr   kombu.utils.encodingr   celery.exceptionsr   celery.platformsr   ru   celery.utils.functionalr	   celery.utils.logr
   celery.utils.serializationr   r   celery.utils.timer   r.   r   ry   rf   r   __all__r   r0   r   r   r    r'   r   rB   rE   rH   rN   rP   r^   requests__getitem__r]   r|   __contains__r   rX   ro   r   rj   rS   rl   r   floatr   r   r   r   r   r   r   r   r   r   r   rd   rc   r   r   intr   r   r  r  r  r  r  r  r"  r%  r*  r!   r!   r!   r$   <module>   s,   

	




6

$





	








				
