o
    Df	                     @   s`   d Z ddlmZ ddlmZ ddlmZ dZeeZ	e	j
e	je	jZ
ZZG dd dejZd	S )
z-Worker <-> Worker Sync at startup (Bootstep).    )	bootsteps)
get_logger   )Events)Minglec                       sv   e Zd ZdZd ZefZddhZd fdd	Zdd Z	d	d
 Z
dd Zdd Zdd ZdddZdd Zdd Z  ZS )r   zBootstep syncing state with neighbor workers.

    At startup, or upon consumer restart, this will:

    - Sync logical clocks.
    - Sync revoked tasks.

    amqpredisFc                    s0   | o|  |j| _t j|fd|i| d S )Nwithout_mingle)compatible_transportappenabledsuper__init__)selfcr	   kwargs	__class__ V/home/ubuntu/webapp/venv/lib/python3.10/site-packages/celery/worker/consumer/mingle.pyr      s   
zMingle.__init__c                 C   s:   |  }|jj| jv W  d    S 1 sw   Y  d S N)connection_for_read	transportdriver_typecompatible_transports)r   r   connr   r   r   r
       s   
$zMingle.compatible_transportc                 C   s   |  | d S r   )sync)r   r   r   r   r   start$   s   zMingle.startc                    sb   t d  }|r+t dtdd | D   fdd| D  t d d S t d d S )Nzmingle: searching for neighborszmingle: sync with %s nodesc                 S   s   g | ]\}}|r|qS r   r   ).0replyvaluer   r   r   
<listcomp>,   s    zMingle.sync.<locals>.<listcomp>c                    s"   g | ]\}}|r  ||qS r   )on_node_reply)r   nodenamer   r   r   r   r   r!   -   s    zmingle: sync completezmingle: all alone)info
send_hellolenitems)r   r   repliesr   r$   r   r   '   s   
zMingle.syncc                 C   sD   |j jjd|jd}|jjj}||j|j	pi }|
|jd  |S )Ng      ?)timeout
connection)r   controlinspectr+   
controllerstaterevokedhellohostname_datapop)r   r   r-   our_revokedr)   r   r   r   r&   3   s
   
zMingle.send_helloc              
   C   sd   t d| z| j|fi | W d S  ty     ty1 } ztd|| W Y d }~d S d }~ww )Nz mingle: processing reply from %szmingle: sync with %s failed: %r)debugsync_with_nodeMemoryError	Exception	exception)r   r   r#   r   excr   r   r   r"   :   s   
zMingle.on_node_replyNc                 K   s   |  || | || d S r   )on_clock_eventon_revoked_received)r   r   clockr0   r   r   r   r   r7   C   s   zMingle.sync_with_nodec                 C   s&   |r|j j| d S |j j  d S r   )r   r>   adjustforward)r   r   r>   r   r   r   r<   G   s   &zMingle.on_clock_eventc                 C   s   |r|j jj| d S d S r   )r.   r/   r0   update)r   r   r0   r   r   r   r=   J   s   zMingle.on_revoked_received)F)NN)__name__
__module____qualname____doc__labelr   requiresr   r   r
   r   r   r&   r"   r7   r<   r=   __classcell__r   r   r   r   r      s    	
	r   N)rE   celeryr   celery.utils.logr   eventsr   __all__rB   loggerr6   r%   r:   StartStopStepr   r   r   r   r   <module>   s    