o
    Df                     @   s   d Z ddlmZ ddlmZ ddlmZ ddlmZ ddl	m
Z
 ddlmZ ddlmZmZ dd	lmZ dd
lmZ ddlmZ ddlmZ dZeeZejejZZG dd dejZdS )z)Worker <-> Worker communication Bootstep.    )defaultdict)partial)heappush)
itemgetter)Consumer)	DummyLock)ContentDisallowedDecodeError)	bootsteps)
get_logger)Bunch   )Mingle)Gossipc                       s   e Zd ZdZd ZefZedddddddZd	d
hZ			d- fdd	Z
dd Zd.ddZdd Zdd Z fddZdd Zdd Zdd  Zd!d" Zd#d$ Zd%d& Zd'd( Zd)d* Zd+d, Z  ZS )/r   zfBootstep consuming events from other workers.

    This keeps the logical clock value up to date.
    idclockhostnamepidtopicactioncveramqpredisF      @       @c                    s  | o|  |j| _|j| _| |_|jjj| _|j| _d| jt|j	g| _
tt t t d| _|j| _| jrR|jjj| j| jdd| _|jrMt |_| jj| _|| _|| _d | _tt| _i | _| j| j d| _!|jj"| _"d| j#i| _$t% j&|fi | d S )N.)	node_join
node_leave	node_lostr   )on_node_joinon_node_leavemax_tasks_in_memory)zworker.electzworker.elect.acktask)'compatible_transportappenabledgossipeventsReceiverr   joinstrr   full_hostnamer   setontimerStater   r    statehubr   _mutexeventupdate_stateintervalheartbeat_interval_trefr   listconsensus_requestsconsensus_replieson_electon_elect_ackevent_handlersr   	call_taskelection_handlerssuper__init__)selfcwithout_gossipr5   r6   kwargs	__class__ V/home/ubuntu/webapp/venv/lib/python3.10/site-packages/celery/worker/consumer/gossip.pyrA   $   sB   


zGossip.__init__c                 C   s:   |  }|jj| jv W  d    S 1 sw   Y  d S N)connection_for_read	transportdriver_typecompatible_transports)rB   r$   connrH   rH   rI   r#   M   s   
$zGossip.compatible_transportNc                 C   s$   g | j |< | jjd|||dd d S )Nzworker-electr   )r   r   r   r   )r:   
dispatchersend)rB   r   r   r   rH   rH   rI   electionQ   s
   

zGossip.electionc              
   C   sJ   z| j |  W d S  ty$ } ztd| W Y d }~d S d }~ww )NzCould not call task: %r)r$   	signatureapply_async	Exceptionlogger	exception)rB   r"   excrH   rH   rI   r>   X   s   zGossip.call_taskc           
   
   C   s   z|  |\}}}}}}}W n ty& }	 ztd|	W  Y d }	~	S d }	~	ww t| j| || d| ||f | jjd|d d S )Nz!election request missing field %sr   zworker-elect-ack)r   )_cons_stamp_fieldsKeyErrorrV   rW   r   r9   rP   rQ   )
rB   r3   id_r   r   r   r   r   _rX   rH   rH   rI   r;   ^   s   

zGossip.on_electc                    s   t  | |j| _d S rJ   )r@   startevent_dispatcherrP   )rB   rC   rF   rH   rI   r]   j   s   zGossip.startc           
      C   s   |d }z| j | }W n
 ty   Y d S w t| j }||d  t|t|kru| j| j	| \}}}}|| j
kr_td| z| j| }	W n tyY   td| Y nw |	| ntd|| | j	|d  | j |d  d S d S )Nr   r   zI won the election %rzUnknown election topic %rznode %s elected for %r)r:   rZ   r,   r0   alive_workersappendlenr   	sort_heapr9   r+   infor?   rV   rW   pop)
rB   r3   r   repliesr_   r\   leaderr   r   handlerrH   rH   rI   r<   n   s0   


zGossip.on_elect_ackc                 C       t d|j | | jj| d S )Nz%s joined the party)debugr   _call_handlersr-   r   rB   workerrH   rH   rI   r         zGossip.on_node_joinc                 C   rh   )Nz%s left)ri   r   rj   r-   r   rk   rH   rH   rI   r       rm   zGossip.on_node_leavec                 C   rh   )Nzmissed heartbeat from %s)rc   r   rj   r-   r   rk   rH   rH   rI   on_node_lost   rm   zGossip.on_node_lostc                 O   sR   |D ]$}z	||i | W q t y& } ztd|| W Y d }~qd }~ww d S )Nz!Ignored error from handler %r: %r)rU   rV   rW   )rB   handlersargsrE   rg   rX   rH   rH   rI   rj      s   zGossip._call_handlersc                 C   s,   | j d ur
| j   | j| j| j| _ d S rJ   )r7   cancelr.   call_repeatedlyr5   periodic)rB   rH   rH   rI   register_timer   s   

zGossip.register_timerc                 C   sR   | j j}t }| D ]}|js|| | | q|D ]	}||jd  qd S rJ   )	r0   workersr,   valuesaliveaddrn   rd   r   )rB   ru   dirtyrl   rH   rH   rI   rs      s   

zGossip.periodicc                 C   s>   |    | j|d| jd}t||jgt| j|j|jddgS )Nzworker.#)routing_key	queue_ttlT)queues
on_messageacceptno_ack)	rt   r(   r6   r   queuer   r}   event_from_messager~   )rB   channelevrH   rH   rI   get_consumers   s   zGossip.get_consumersc           	   
   C   s   |j d }|ddd dkrd S z| j| }W n	 ty!   Y nw ||jS |jdp1|jd }|| jkrbz||j\}}| | W d S  t	t
tfya } zt| W Y d }~d S d }~ww | j  d S )Nrz   r   r   r   r"   r   )delivery_infosplitr=   rZ   payloadheadersgetr   r4   r	   r   	TypeErrorrV   errorr   forward)	rB   preparemessage_typerg   r   r\   r3   rX   rH   rH   rI   r}      s*   


zGossip.on_message)Fr   r   rJ   )__name__
__module____qualname____doc__labelr   requiresr   rY   rN   rA   r#   rR   r>   r;   r]   r<   r   r    rn   rj   rt   rs   r   r}   __classcell__rH   rH   rF   rI   r      s2    )

r   N)r   collectionsr   	functoolsr   heapqr   operatorr   kombur   kombu.asynchronous.semaphorer   kombu.exceptionsr   r	   celeryr
   celery.utils.logr   celery.utils.objectsr   mingler   __all__r   rV   ri   rc   ConsumerStepr   rH   rH   rH   rI   <module>   s     