o
    Df?                     @   s*  d Z ddlZddlZddlZddlZddlZddlmZmZm	Z	 ddl
mZ ddlmZ ddlmZ ddlmZ ddlmZ dd	lmZmZmZ dd
lmZmZmZmZ ddlmZ dZdZ dd Z!dd Z"dd Z#dd Z$dd Z%G dd dZ&G dd dZ'dd Z(G dd dZ)G d d! d!eZ*dS )"zStart/stop/manage workers.    N)OrderedDictUserListdefaultdict)partial)Popen)sleep)	from_utf8)cached_property)
IS_WINDOWSPidfilesignal_name)gethostnamehost_formatnode_format	nodesplit)saferepr)ClusterNodeceleryc                  G   s   d tf|  S )N )join
CELERY_EXE)args r   J/home/ubuntu/webapp/venv/lib/python3.10/site-packages/celery/apps/multi.py
celery_exe      r   c                 C   sN   |}d| v rt | }t|\}}|} n| |  }t | d| }| ||fS )N@)r   r   )nameprefixsuffixhostnamenodename	shortnamer   r   r   build_nodename   s   
r$   c              	   C   s   t t| ||| dddS )Nz%iz%I)r   NdhiI)r   r   )r"   r#   r!   r   r   r   build_expander*   s   r*   c                 C   s.   |s| S |  dr|  d| S |  d| S )N--=r   )
startswith)optvaluer   r   r   
format_opt6   s
   
r0   c                 C   s   dd |   D S )Nc                 S   s<   i | ]\}}t |d krd|ddnd| | qS )   z--{}_-)lenformatreplace).0kvr   r   r   
<dictcomp>?   s    z+_kwargs_to_command_line.<locals>.<dictcomp>)items)kwargsr   r   r   _kwargs_to_command_line>   s   r=   c                   @   sD   e Zd Zdd Zdd ZdddZddd	Zdd
dZdddZdS )NamespacedOptionParserc                 C   s,   || _ t | _g | _d| _tdd | _d S )N c                   S   s   t  S N)r   r   r   r   r   <lambda>M   s    z1NamespacedOptionParser.__init__.<locals>.<lambda>)r   r   optionsvaluespassthroughr   
namespaces)selfr   r   r   r   __init__H   s
   zNamespacedOptionParser.__init__c                 C   s   dd | j D }d}|t|k ru|| }|dkr$d||d  | _d S |d dkrc|d dkr:| |dd   n/d }t||d krX||d  d dkrX||d  }|d7 }| |dd  | n| j| |d7 }|t|k sd S d S )	Nc                 S   s   g | ]}|r|qS r   r   )r7   argr   r   r   
<listcomp>P       z0NamespacedOptionParser.parse.<locals>.<listcomp>r   r+   r   r3   r1      )r   r4   r   rD   process_long_optprocess_short_optrC   append)rF   rargsposrH   r/   r   r   r   parseO   s$   $zNamespacedOptionParser.parseNc                 C   s,   d|v r| dd\}}| j||dd d S )Nr,   r1   Fshort)split
add_optionrF   rH   r/   r   r   r   rL   d   s   z'NamespacedOptionParser.process_long_optc                 C   s   | j ||dd d S )NTrR   )rU   rV   r   r   r   rM   i   s   z(NamespacedOptionParser.process_short_optc                 C   s$   |d u r| j }t|fi | j| S r@   )rB   r   rE   )rF   nsdefaultsr   r   r   optmergel   s   zNamespacedOptionParser.optmergeFc                 C   sB   |rdpd}| j }d|v r|d\}}| j| }|||| < d S )Nr3   r+   :)rB   rT   rE   )rF   r   r/   rS   rW   r   destr   r   r   rU   q   s   
z!NamespacedOptionParser.add_optionr@   )FN)	__name__
__module____qualname__rG   rQ   rL   rM   rY   rU   r   r   r   r   r>   F   s    


r>   c                   @   s   e Zd ZdZ	d*ddZdd Zdd Zd	d
 Zdd Zdd Z	d+ddZ
d+ddZejddddfddZd,ddZdd Zdd Zdd Zedd Zedd  Zed!d" Zejd#d" Zed$d% Zed&d' Zed(d) ZdS )-r   zRepresents a node in a cluster.Nc                 C   s\   || _ |pdtdd | _|| _|pd| _| |pt | _|  | _	| 
 | _d | _d S )Nz-m workerz--detachr?   )r   r   cmdrN   
extra_args_annotate_with_default_optsr   rB   _prepare_expanderexpander_prepare_argvargv_pid)rF   r   r`   rN   rB   ra   r   r   r   rG   }   s   



zNode.__init__c                 C   sD   | j |d< | |ddgd | |ddgd | |dgtj |S )	N-n	--pidfile-pz/var/run/celery/%n.pid	--logfile-fz/var/log/celery/%n%I.log--executable)r   _setdefaultoptsys
executable)rF   rB   r   r   r   rb      s
   
z Node._annotate_with_default_optsc              	   C   st   |dd  D ]}z|| W   S  t y   Y qw ||d tj|}tj|}|r8tj|s8t| |S )Nr1   r   )KeyError
setdefaultospathnormpathdirnameexistsmakedirs)rF   r&   altr/   r.   dir_pathr   r   r   rn      s   
zNode._setdefaultoptc                 C   s    | j dd\}}t| j ||S )Nr   r1   )r   rT   r*   )rF   r#   r!   r   r   r   rc      s   zNode._prepare_expanderc              	      s      jd}|dd } j } j D ]\}}|dv r3||t|  | |	| qd
|g}t| fdd| D   jg } jrY|   jf7 }|S )Nr   r   r1   )z-Az--appz-bz--brokerz--result-backendz--loaderz--configz	--workdirz-Cz
--no-colorz-qz--quietc                    s    g | ]\}}t | |qS r   )r0   rd   )r7   r.   r/   rF   r   r   rI      s    z&Node._prepare_argv.<locals>.<listcomp>)rd   r`   rT   indexrB   copyr;   insertr0   popr   tuplera   rN   )rF   r`   r(   rB   r.   r/   rf   r   r{   r   re      s*   



zNode._prepare_argvc                 C   s
   |  dS Nr   )sendr{   r   r   r   alive   s   
z
Node.alivec              
   C   sj   | j }|r.z	t|| W dS  ty- } z|jtjkr t||  W Y d }~dS d }~ww t||  d S )NFT)pidrs   killOSErrorerrnoESRCH
maybe_call)rF   sigon_errorr   excr   r   r   r      s   
z	Node.sendc                 K   s   | j | jf| j|d|S )N)rt   env)	_waitexecrf   rp   )rF   r   r<   r   r   r   start   s   z
Node.startc           	      C   sB   |  ||}t|| d||d t||d}| j| ||dS )Nr   )argstrr   )r   )on_signalled
on_failure)prepare_argvr   r   r   handle_process_exitwait)	rF   rf   rt   r   on_spawnr   r   r   piper   r   r   r      s   zNode._waitexecc                 C   s4   |dk rt || |  | S |dkrt || | |S r   )r   )rF   retcoder   r   r   r   r   r      s   zNode.handle_process_exitc                 C   s(   d |gt| }tjt|t dS )Nr   )posix)r   listshlexrT   r   r
   )rF   rf   rt   r   r   r   r   r      s   zNode.prepare_argvc              	   G   s8   |D ]}z| j | W   S  ty   Y qw t|d r   )rB   rq   )rF   ry   r.   r   r   r   getopt   s   zNode.getoptc                 C   s   dt | j d| j dS )N<z: >)typer\   r   r{   r   r   r   __repr__   s   zNode.__repr__c                 C      |  | ddS )Nri   rj   rd   r   r{   r   r   r   pidfile      zNode.pidfilec                 C   r   )Nrk   rl   r   r{   r   r   r   logfile   r   zNode.logfilec                 C   s6   | j d ur| j S zt| j W S  ty   Y d S w r@   )rg   r   r   read_pid
ValueErrorr{   r   r   r   r      s   
zNode.pidc                 C   s
   || _ d S r@   )rg   )rF   r/   r   r   r   r        
c                 C   s
   | j d S )Nrm   rB   r{   r   r   r   rp     r   zNode.executablec                 C   s   | j f| j S r@   )rp   rf   r{   r   r   r   argv_with_executable  s   zNode.argv_with_executablec                 K   s   | |t |dS )Nr   )r=   )clsr   r<   r   r   r   from_kwargs  s   zNode.from_kwargs)NNNNr@   )NN)r\   r]   r^   __doc__rG   rb   rn   rc   re   r   r   r   ro   rp   r   r   r   r   r   r	   r   r   propertyr   setterr   classmethodr   r   r   r   r   r   z   s@    










r   c                 O   s   | d ur| |i | d S d S r@   r   )funr   r<   r   r   r   r     s   r   c                   @   sR   e Zd ZeZ			dddZdd Zdd	 Zd
d Zdd Zdd Z	dddZ
dS )MultiParsercelery workerr?   r   c                 C   s"   || _ || _|| _|| _|| _d S r@   )r`   rN   r   r    range_prefix)rF   r`   rN   r   r    r   r   r   r   rG      s
   
zMultiParser.__init__c                    s   j }tjt|dk}jdjdj ddt }dp1ddj	p:|d	v rAdnd
dpKj
}|rbz
||}W n	 tya   Y nw | |  fdd|D S )Nr1   z--cmdz--appendz
--hostnamerh   z--prefixr?   z--suffix)z""z''z--range-prefixc              
   3   s&    | ]} | V  qd S r@   )_node_from_options)r7   r   rN   r`   rB   pr   rF   r    r   r   	<genexpr>=  s    
z$MultiParser.parse.<locals>.<genexpr>)rC   dictrB   r4   r   r   r`   rN   r   r    r   _get_rangesr   _update_ns_opts_update_ns_ranges)rF   r   namesrangesr!   r   r   r   r   rQ   )  s,   
zMultiParser.parsec                 C   s>   t |||\}}	}
|	|jv r|	n|}t|	||||||jS r@   )r$   rE   r   rY   rD   )rF   r   r   r   r    r`   rN   rB   	namespacer"   r2   r   r   r   r   C  s
   zMultiParser._node_from_optionsc                 C   s$   t |d }dd td|d D S )Nr   c                 S   s   g | ]}t |qS r   strr7   nr   r   r   rI   L  rJ   z+MultiParser._get_ranges.<locals>.<listcomp>r1   )intrange)rF   r   	noderanger   r   r   r   J  s   zMultiParser._get_rangesc              	   C   s|   t |j D ]4\}}| r;t|d }|dk r td|z|j||  | W q ty:   td|w qd S )Nr1   r   zIndexes start at 1 got: zNo node at index )r   rE   r;   isdigitr   rq   update
IndexError)rF   r   r   ns_namens_optsns_indexr   r   r   r   N  s   zMultiParser._update_ns_optsc                 C   s^   t |j D ]%\}}d|v s|r,d|v r,| ||D ]
}|j| | q|j| qd S )N,r3   )r   rE   r;   _parse_ns_ranger   r   )rF   r   r   r   r   subnsr   r   r   r   [  s   zMultiParser._update_ns_rangesFc                 C   sr   g }d|v r| dp|gD ](}|r1d|v r1| d\}}|dd tt|t|d D  q|| q|S )Nr   r3   c                 s   s    | ]}t |V  qd S r@   r   r   r   r   r   r   g  s    
z.MultiParser._parse_ns_range.<locals>.<genexpr>r1   )rT   extendr   r   rN   )rF   rW   r   retspacer   stopr   r   r   r   b  s   

zMultiParser._parse_ns_rangeN)r   r?   r?   r?   r   )F)r\   r]   r^   r   rG   rQ   r   r   r   r   r   r   r   r   r   r     s    
	r   c                   @   s   e Zd ZdZ																d"ddZdd Zdd Zd	d
 Zdd Zdd Z	e
jfddZdde
jfddZdde
jfddZdde
jfddZe
jdfddZdd Zd#ddZdd Zed d! ZdS )$r   zRepresent a cluster of workers.Nc                 C   sx   || _ |ptd| _|| _|| _|| _|| _|| _|| _|	| _	|
| _
|| _|| _|| _|| _|| _|| _|| _|| _d S )Nr_   )nodesr   r`   r   on_stopping_preambleon_send_signalon_still_waiting_foron_still_waiting_progresson_still_waiting_endon_node_starton_node_restarton_node_shutdown_okon_node_statuson_node_signalon_node_signal_deadon_node_downon_child_spawnon_child_signalledon_child_failure)rF   r   r`   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   rG   r  s$   
zCluster.__init__c                    s    fdd D S )Nc                    s   g | ]}  |qS r   )
start_node)r7   noder{   r   r   rI     s    z!Cluster.start.<locals>.<listcomp>r   r{   r   r{   r   r     s   zCluster.startc                 C   s(   t | j| | |}t | j|| |S r@   )r   r   _start_noder   )rF   r   r   r   r   r   r     s   
zCluster.start_nodec                 C   s   |j | j| j| j| jdS )N)r   r   r   )r   r   r   r   r   )rF   r   r   r   r   r     s   zCluster._start_nodec                 C   s8   | j | jdD ]}t| j|t| ||| j qd S )Non_down)getpidsr   r   r   r   r   r   )rF   r   r   r   r   r   send_all  s   zCluster.send_allc                 C   s   |  tjS r@   )r   signalSIGKILLr{   r   r   r   r     s   zCluster.killc                    s&   g   fdd}j d||d  S )Nc                    s2   t j|  | }t j| |  | d S r@   )r   r   r   r   rN   )r   retvalretvalsrF   r   r   restart_on_down  s   
z(Cluster.restart.<locals>.restart_on_downrK   retryr   r   _stop_nodes)rF   r   r   r   r   r   restart  s   zCluster.restartc                 C      | j |||dS Nr   r   rF   r   callbackr   r   r   r   r     r   zCluster.stoprK   c                 C   r   r   r   r   r   r   r   stopwait  r   zCluster.stopwaitc                 C   sN   |d ur|n| j }t| j|d}|r#| j|||dD ]	}t|| qd S d S )Nr   )r   r   )r   r   r   shutdown_nodesr   )rF   r   r   r   r   r   r   r   r   r     s   zCluster._stop_nodesc                 c   s   t |}t| j| t  }|D ]}t| j|t| ||| js*|| |V  q||8 }|rt| j| d}|r{t  }|D ]&}|d7 }t| j	| |
 sft| j| || |V  t| j|  nq@||8 }|ry|t| sytt| |s;t| j d S d S )Nr   r1   )setr   r   r   r   r   r   addr   r   r   r   r4   r   floatr   )rF   r   r   r   P	to_remover   itsr   r   r   r     s@   

zCluster.shutdown_nodesc                 C   s$   | D ]}|j |kr|  S qt|r@   )r   rq   )rF   r   r   r   r   r   find  s
   
zCluster.findc                 c   s(    | D ]}|j r|V  qt|| qd S r@   )r   r   )rF   r   r   r   r   r   r     s   zCluster.getpidsc                 C   s(   dj t| tdd | D t| jdS )Nz<{name}({0}): {1}>c                 S   s   g | ]}|j qS r   r   r   r   r   r   rI     s    z$Cluster.__repr__.<locals>.<listcomp>r  )r5   r4   r   r   r\   r{   r   r   r   r     s   zCluster.__repr__c                 C   s   | j S r@   )r   r{   r   r   r   data  s   zCluster.data)NNNNNNNNNNNNNNNNNr@   )r\   r]   r^   r   rG   r   r   r   r   r   r   SIGTERMr   r   r   r   r   r  r   r   r   r  r   r   r   r   r   o  sD    
$
r   )+r   r   rs   r   r   ro   collectionsr   r   r   	functoolsr   
subprocessr   timer   kombu.utils.encodingr   kombu.utils.objectsr	   celery.platformsr
   r   r   celery.utils.nodenamesr   r   r   r   celery.utils.safereprr   __all__r   r   r$   r*   r0   r=   r>   r   r   r   r   r   r   r   r   <module>   s8    4 R