o
    Df                     @  s  d Z ddlmZ ddlZddlZddlZddlZddlmZ ddlm	Z	m
Z
mZ ddlmZ ddlmZ ddlmZ dd	lmZmZ dd
lmZ ddlmZ ddlmZmZ ddlmZ ddlmZ ddl m!Z! ddl"m#Z#m$Z$ ddl%m&Z& ddl'm(Z( ddl)m*Z* erddl+m,Z, dZ-dZ.dZ/dZ0dZ1dZ2ee3Z4eddZ5eddZ6G d d! d!Z7G d"d# d#e8Z9G d$d% d%e:Z;G d&d' d'Z<G d(d) d)Z=G d*d+ d+ej>Z>G d,d- d-Z?G d.d/ d/e?ej@ZAG d0d1 d1ejBZBG d2d3 d3ejCZCdS )4zPVirtual transport implementation.

Emulates the AMQ API for non-AMQ transports.
    )annotationsN)array)OrderedDictdefaultdict
namedtuple)count)Finalize)Empty)	monotonicsleep)TYPE_CHECKING)queue_declare_ok_t)ChannelErrorResourceError)
get_logger)base)emergency_dump_state)bytes_to_strstr_to_bytes)	FairCycleuuid   )STANDARD_EXCHANGE_TYPES)TracebackTypeHzlMessage could not be delivered: No queues bound to exchange {exchange!r} using binding key {routing_key!r}.
zkCannot redeclare exchange {0!r} in vhost {1!r} with different type, durable, autodelete or arguments value.z;Requeuing undeliverable message for queue %r: No consumers.z)Restoring {0!r} unacknowledged message(s)z#UNABLE TO RESTORE {0} MESSAGES: {1}binding_key_t)queueexchangerouting_keyqueue_binding_t)r   r   	argumentsc                   @  s    e Zd ZdZdd Zdd ZdS )Base64zBase64 codec.c                 C  s   t tt|S N)r   base64	b64encoder   selfs r)   U/home/ubuntu/webapp/venv/lib/python3.10/site-packages/kombu/transport/virtual/base.pyencodeF   s   zBase64.encodec                 C  s   t t|S r#   )r$   	b64decoder   r&   r)   r)   r*   decodeI      zBase64.decodeN)__name__
__module____qualname____doc__r+   r-   r)   r)   r)   r*   r"   C   s    r"   c                   @     e Zd ZdZdS )NotEquivalentErrorzAEntity declaration is not equivalent to the previous declaration.Nr/   r0   r1   r2   r)   r)   r)   r*   r4   M       r4   c                   @  r3   )UndeliverableWarningz.The message could not be delivered to a queue.Nr5   r)   r)   r)   r*   r7   Q   r6   r7   c                   @  sV   e Zd ZdZdZdZdZdddZdd Zdd Z	d	d
 Z
dd Zdd Zdd ZdS )BrokerStatez2Broker state holds exchanges, queues and bindings.Nc                 C  s&   |d u ri n|| _ i | _tt| _d S r#   )	exchangesbindingsr   setqueue_index)r'   r9   r)   r)   r*   __init__r   s   zBrokerState.__init__c                 C  s"   | j   | j  | j  d S r#   )r9   clearr:   r<   r'   r)   r)   r*   r>   w   s   

zBrokerState.clearc                 C  s   |||f| j v S r#   )r:   )r'   r   r   r   r)   r)   r*   has_binding|   s   zBrokerState.has_bindingc                 C  s.   t |||}| j|| | j| | d S r#   )r   r:   
setdefaultr<   add)r'   r   r   r   r!   keyr)   r)   r*   binding_declare   s   zBrokerState.binding_declarec                 C  sB   t |||}z| j|= W n
 ty   Y d S w | j| | d S r#   )r   r:   KeyErrorr<   remove)r'   r   r   r   rC   r)   r)   r*   binding_delete   s   zBrokerState.binding_deletec                   s<   z j |}W n
 ty   Y d S w  fdd|D  d S )Nc                   s   g | ]	} j |d qS r#   )r:   pop).0bindingr?   r)   r*   
<listcomp>   s    z5BrokerState.queue_bindings_delete.<locals>.<listcomp>)r<   rH   rE   )r'   r   r:   r)   r?   r*   queue_bindings_delete   s   z!BrokerState.queue_bindings_deletec                   s    fdd j | D S )Nc                 3  s&    | ]}t |j|j j| V  qd S r#   )r    r   r   r:   )rI   rC   r?   r)   r*   	<genexpr>   s
    
z-BrokerState.queue_bindings.<locals>.<genexpr>)r<   r'   r   r)   r?   r*   queue_bindings   s   
zBrokerState.queue_bindingsr#   )r/   r0   r1   r2   r9   r:   r<   r=   r>   r@   rD   rG   rL   rO   r)   r)   r)   r*   r8   U   s    

	r8   c                   @  s~   e Zd ZdZdZdZdZdZdddZdd Z	d	d
 Z
dd Zdd Zdd Zdd ZdddZdd ZdddZdd ZdS )QoSzQuality of Service guarantees.

    Only supports `prefetch_count` at this point.

    Arguments:
    ---------
        channel (ChannelT): Connection channel.
        prefetch_count (int): Initial prefetch count (defaults to 0).
    r   NTc                 C  sR   || _ |pd| _t | _d| j_t | _| jj| _| jj	| _
t| | jdd| _d S )Nr   Fr   )exitpriority)channelprefetch_countr   
_deliveredrestoredr;   _dirtyrB   
_quick_ack__setitem___quick_appendr   restore_unacked_once_on_collect)r'   rR   rS   r)   r)   r*   r=      s   


zQoS.__init__c                 C  s$   | j }| pt| jt| j |k S )zReturn true if the channel can be consumed from.

        Used to ensure the client adhers to currently active
        prefetch limits.
        )rS   lenrT   rV   r'   pcountr)   r)   r*   can_consume   s   zQoS.can_consumec                 C  s,   | j }|rt|t| jt| j  dS dS )a  Return the maximum number of messages allowed to be returned.

        Returns an estimated number of messages that a consumer may be allowed
        to consume at once from the broker.  This is used for services where
        bulk 'get message' calls are preferred to many individual 'get message'
        calls - like SQS.

        Returns
        -------
            int: greater than zero.
        r   N)rS   maxr\   rT   rV   r]   r)   r)   r*   can_consume_max_estimate   s   zQoS.can_consume_max_estimatec                 C  s   | j r|   | || dS )z&Append message to transactional state.N)rV   _flushrY   )r'   messagedelivery_tagr)   r)   r*   append   s   z
QoS.appendc                 C  s
   | j | S r#   )rT   r'   rd   r)   r)   r*   get      
zQoS.getc                 C  s>   | j }| j}	 z| }W n
 ty   Y dS w ||d q)z'Flush dirty (acked/rejected) tags from.r   N)rV   rT   rH   rE   )r'   dirty	delivered	dirty_tagr)   r)   r*   rb      s   z
QoS._flushc                 C     |  | dS )z8Acknowledge message and remove from transactional state.N)rW   rf   r)   r)   r*   ack   s   zQoS.ackFc                 C  s$   |r| j | j|  | | dS )z4Remove from transactional state and requeue message.N)rR   _restore_at_beginningrT   rW   r'   rd   requeuer)   r)   r*   reject   s   z
QoS.rejectc              
   C  s   |    | j}g }| jj}|j}|rEz| \}}W n	 ty"   Y n#w z|| W n tyB } z|||f W Y d}~nd}~ww |s|  |S )z$Restore all unacknowledged messages.N)	rb   rT   rR   _restorepopitemrE   BaseExceptionre   r>   )r'   rj   errorsrestorepop_message_rc   excr)   r)   r*   restore_unacked   s(   
zQoS.restore_unackedc                 C  s   | j   |   |du rtjn|}| j}| jr| jjsdS t	|ddr*|r(J dS z@|r_t
tt| j|d |  }|rett| \}}t
tt|||d t||d W d|_dS W d|_dS W d|_dS d|_w )zRestore all unacknowledged messages at shutdown/gc collect.

        Note:
        ----
            Can only be called once for each instance, subsequent
            calls will be ignored.
        NrU   )file)stderrT)r[   cancelrb   sysr|   rT   restore_at_shutdownrR   
do_restoregetattrprintRESTORING_FMTformatr\   rz   listzipRESTORE_PANIC_FMTr   rU   )r'   r|   state
unrestoredru   messagesr)   r)   r*   rZ     s4   


zQoS.restore_unacked_oncec                 O     dS )a  Restore any pending unacknowledged messages.

        To be filled in for visibility_timeout style implementations.

        Note:
        ----
            This is implementation optional, and currently only
            used by the Redis transport.
        Nr)   )r'   argskwargsr)   r)   r*   restore_visible2      zQoS.restore_visible)r   Fr#   )r/   r0   r1   r2   rS   rT   rV   r   r=   r_   ra   re   rg   rb   rm   rq   rz   rZ   r   r)   r)   r)   r*   rP      s"    
	

 rP   c                      s*   e Zd ZdZd fdd	Zdd Z  ZS )MessagezMessage object.Nc                   st   || _ |d }|d}|r|||d}t jd|||d |d|d|d||dd	d
	| d S )N
propertiesbodybody_encodingrd   content-typecontent-encodingheadersdelivery_infozutf-8)	r   rR   rd   content_typecontent_encodingr   r   r   
postencoder)   )_rawrg   decode_bodysuperr=   )r'   payloadrR   r   r   r   	__class__r)   r*   r=   A  s$   


zMessage.__init__c                 C  sJ   | j }| j| j|d\}}t| j}|dd  ||| j| j	|dS )Nr   compression)r   r   r   r   r   )
r   rR   encode_bodyr   rg   dictr   rH   r   r   )r'   propsr   rx   r   r)   r)   r*   serializableS  s   

zMessage.serializabler#   )r/   r0   r1   r2   r=   r   __classcell__r)   r)   r   r*   r   >  s    r   c                   @  s\   e Zd ZdZdddZdd Zdd Zd	d
 Zdd Zdd Z	dd Z
dddZdd ZdS )AbstractChannelzAbstract channel interface.

    This is an abstract class defining the channel methods
    you'd usually want to implement in a virtual channel.

    Note:
    ----
        Do not subclass directly, but rather inherit
        from :class:`Channel`.
    Nc                 C     t d)zGet next message from `queue`.z$Virtual channels must implement _getNotImplementedError)r'   r   timeoutr)   r)   r*   _geto     zAbstractChannel._getc                 C  r   )zPut `message` onto `queue`.z$Virtual channels must implement _putr   )r'   r   rc   r)   r)   r*   _puts  r   zAbstractChannel._putc                 C  r   )z!Remove all messages from `queue`.z&Virtual channels must implement _purger   rN   r)   r)   r*   _purgew  r   zAbstractChannel._purgec                 C  r   )z<Return the number of messages in `queue` as an :class:`int`.r   r)   rN   r)   r)   r*   _size{  s   zAbstractChannel._sizec                 O  rl   )zDelete `queue`.

        Note:
        ----
            This just purges the queue, if you need to do more you can
            override this method.
        Nr   )r'   r   r   r   r)   r)   r*   _delete  s   zAbstractChannel._deletec                 K  r   )zCreate new queue.

        Note:
        ----
            Your transport can override this method if it needs
            to do something whenever a new queue is declared.
        Nr)   r'   r   r   r)   r)   r*   
_new_queue  r   zAbstractChannel._new_queuec                 K  r   )zVerify that queue exists.

        Returns
        -------
            bool: Should return :const:`True` if the queue exists
                or :const:`False` otherwise.
        Tr)   r   r)   r)   r*   
_has_queue  s   zAbstractChannel._has_queuec                 C  s
   | |S )z-Poll a list of queues for available messages.)rg   )r'   cyclecallbackr   r)   r)   r*   _poll     
zAbstractChannel._pollc                 C  s   |  |}||| d S r#   )r   )r'   r   r   rc   r)   r)   r*   _get_and_deliver  s   
z AbstractChannel._get_and_deliverr#   )r/   r0   r1   r2   r   r   r   r   r   r   r   r   r   r)   r)   r)   r*   r   c  s    

	

r   c                   @  s   e Zd ZdZeZeZdZeeZ	dZ
de iZdZedZdZdZdZdZd	Zd
d Z			djddZdkddZdlddZdkddZdd Z		dmddZ		dmddZ		dnddZ		dnddZd d! Zd"d# Z d$d% Z!d&d' Z"d(d) Z#d*d+ Z$d,d- Z%dod.d/Z&dod0d1Z'dod2d3Z(dod4d5Z)		dpd6d7Z*d8d9 Z+d:d; Z,dqd<d=Z-drd>d?Z.d@dA Z/dBdC Z0dsdDdEZ1dFdG Z2		dtdHdIZ3dudJdKZ4dLdM Z5drdNdOZ6drdPdQZ7dRdS Z8dTdU Z9dvd^d_Z:e;d`da Z<e;dbdc Z=e;ddde Z>dodfdgZ?dhdi Z@dS )wChannelzVirtual channel.

    Arguments:
    ---------
        connection (ConnectionT): The transport instance this
            channel is part of.
    TFr$   r   N)r   deadletter_queuer   	   c              	     s   | _ t  _d  _i  _g  _d  _d _ fdd j	 D  _ 
  _ j jj} jD ]}z
t |||  W q0 tyE   Y q0w d S )NFc                   s   i | ]	\}}|| qS r)   r)   )rI   typclsr?   r)   r*   
<dictcomp>  s    z$Channel.__init__.<locals>.<dictcomp>)
connectionr;   
_consumers_cycle_tag_to_queue_active_queues_qosclosedexchange_typesitems_get_free_channel_id
channel_idclienttransport_optionsfrom_transport_optionssetattrrE   )r'   r   r   toptsopt_namer)   r?   r*   r=     s&   



zChannel.__init__directc           	   	   C  s   |pd}|p	d| }|r$|| j jvr"td|| jjjpdddddS z#| j j| }| |||||||sEt	t
|| jjjpBdW dS  ty_   ||||pTi g d	| j j|< Y dS w )
zDeclare exchange.r   zamq.%sz*NOT_FOUND - no exchange {!r} in vhost {!r}/2   
   Channel.exchange_declare404N)typedurableauto_deleter!   table)r   r9   r   r   r   r   virtual_hosttypeof
equivalentr4   NOT_EQUIVALENT_FMTrE   )	r'   r   r   r   r   r!   nowaitpassiveprevr)   r)   r*   exchange_declare  s:   r   c                 C  s:   |  |D ]\}}}| j|ddd q| jj|d dS )z'Delete `exchange` and all its bindings.T)	if_unusedif_emptyN)	get_tablequeue_deleter   r9   rH   )r'   r   r   r   rkeyrx   r   r)   r)   r*   exchange_delete	  s   zChannel.exchange_deletec                 K  sh   |pdt   }|r"| j|fi |s"td|| jjjpdddd| j|fi | t|| 	|dS )zDeclare queue.z
amq.gen-%sz'NOT_FOUND - no queue {!r} in vhost {!r}r   r   Channel.queue_declarer   r   )
r   r   r   r   r   r   r   r   r   r   )r'   r   r   r   r)   r)   r*   queue_declare  s   r   c           	      K  sj   |r	|  |r	dS | j|D ]\}}}| |||||}| j||g|R i | q| j| dS )zDelete queue.N)r   r   rO   r   prepare_bindr   rL   )	r'   r   r   r   r   r   r   r   metar)   r)   r*   r     s   
zChannel.queue_deletec                 C  s   |  | d S r#   )r   rN   r)   r)   r*   after_reply_message_received'  r.   z$Channel.after_reply_message_received c                 C  r   )Nz(transport does not support exchange_bindr   r'   destinationsourcer   r   r!   r)   r)   r*   exchange_bind*  r   zChannel.exchange_bindc                 C  r   )Nz*transport does not support exchange_unbindr   r   r)   r)   r*   exchange_unbind.  r   zChannel.exchange_unbindc                 K  s   |pd}| j |||rdS | j |||| | j j| dg }| |||||}|| | jr?| j	|g|R   dS dS )z.Bind `queue` to `exchange` with `routing key`.z
amq.directNr   )
r   r@   rD   r9   rA   r   r   re   supports_fanout_queue_bind)r'   r   r   r   r!   r   r   r   r)   r)   r*   
queue_bind2  s   

zChannel.queue_bindc                   sh   | j ||| z| |}W n
 ty   Y d S w | |||||  fdd|D |d d < d S )Nc                   s   g | ]}| kr|qS r)   r)   )rI   r   binding_metar)   r*   rK   P  s    z(Channel.queue_unbind.<locals>.<listcomp>)r   rG   r   rE   r   r   )r'   r   r   r   r!   r   r   r)   r   r*   queue_unbindC  s   
zChannel.queue_unbindc                   s    fdd j jD S )Nc                 3  s0    | ]}  |D ]\}}}|||fV  q	qd S r#   )r   )rI   r   r   patternr   r?   r)   r*   rM   S  s    z(Channel.list_bindings.<locals>.<genexpr>r   r9   r?   r)   r?   r*   list_bindingsR  s   
zChannel.list_bindingsc                 K  
   |  |S )z%Remove all ready messages from queue.r   r   r)   r)   r*   queue_purgeW  r   zChannel.queue_purgec                 C  s   t  S r#   r   r?   r)   r)   r*   _next_delivery_tag[  s   zChannel._next_delivery_tagc                 K  sB   |  ||| |r| |j|||fi |S | j||fi |S )zPublish message.)_inplace_augment_messager   deliverr   )r'   rc   r   r   r   r)   r)   r*   basic_publish^  s   
zChannel.basic_publishc                 C  sJ   |  |d | j\|d< }|d }|j||  d |d j||d d S )Nr   r   )r   rd   r   r   r   )r   r   updater  )r'   rc   r   r   r   r   r)   r)   r*   r  h  s   

z Channel._inplace_augment_messagec                   sJ   |j |< j|  fdd}|jj|< j|   dS )zConsume from `queue`.c                   s*   j | d}sj||j  |S )NrR   )r   qosre   rd   )raw_messagerc   r   no_ackr'   r)   r*   	_callback{  s   z(Channel.basic_consume.<locals>._callbackN)r   r   re   r   
_callbacksr   rB   _reset_cycle)r'   r   r  r   consumer_tagr   r  r)   r  r*   basic_consumev  s   
zChannel.basic_consumec                 C  sh   || j v r2| j | |   | j|d}z| j| W n	 ty'   Y nw | jj|d dS dS )z Cancel consumer by consumer tag.N)	r   rF   r  r   rH   r   
ValueErrorr   r  )r'   r  r   r)   r)   r*   basic_cancel  s   
zChannel.basic_cancelc                 K  sD   z| j | || d}|s| j||j |W S  ty!   Y dS w )z+Get message by direct access (synchronous).r  N)r   r   r	  re   rd   r	   )r'   r   r  r   rc   r)   r)   r*   	basic_get  s   zChannel.basic_getc                 C  s   | j | dS )zAcknowledge message.N)r	  rm   )r'   rd   multipler)   r)   r*   	basic_ack     zChannel.basic_ackc                 C  s   |r| j  S td)zRecover unacked messages.z'Does not support recover(requeue=False))r	  rz   r   )r'   rp   r)   r)   r*   basic_recover  s   
zChannel.basic_recoverc                 C  s   | j j||d dS )zReject message.rp   N)r	  rq   ro   r)   r)   r*   basic_reject  s   zChannel.basic_rejectc                 C  s   || j _dS )zzChange QoS settings for this channel.

        Note:
        ----
            Only `prefetch_count` is supported.
        N)r	  rS   )r'   prefetch_sizerS   apply_globalr)   r)   r*   	basic_qos  s   zChannel.basic_qosc                 C  s   t | jjS r#   )r   r   r9   r?   r)   r)   r*   get_exchanges  s   zChannel.get_exchangesc                 C  s   | j j| d S )z%Get table of bindings for `exchange`.r   r   )r'   r   r)   r)   r*   r     r  zChannel.get_tablec                 C  s6   z
| j j| d }W n ty   |}Y nw | j| S )z.Get the exchange type instance for `exchange`.r   )r   r9   rE   r   )r'   r   defaultr   r)   r)   r*   r     s   
zChannel.typeofc                 C  s   |du r| j }|s|p|gS z| || ||||}W n ty)   g }Y nw |sD|durDtttj	||d | 
| |g}|S )zFind all queues matching `routing_key` for the given `exchange`.

        Returns
        -------
            list[str]: queue names -- must return `[default]`
                if default is set and no queues matched.
        Nr  )r   r   lookupr   rE   warningswarnr7   UNDELIVERABLE_FMTr   r   )r'   r   r   r  Rr)   r)   r*   _lookup  s&   




zChannel._lookupc                 C  s@   |j }| }d|d< | |d |d D ]}| || qdS )z.Redeliver message to its original destination.Tredeliveredr   r   N)r   r   r%  r   )r'   rc   r   r   r)   r)   r*   rr     s   zChannel._restorec                 C  r   r#   )rr   )r'   rc   r)   r)   r*   rn     rh   zChannel._restore_at_beginningc                 C  sN   |p| j j}| jr$| j r$t| dr| j| j|dS | j| j	||dS t
 )N	_get_manyr   )r   _deliverr   r	  r_   hasattrr'  r   r   r   r	   )r'   r   r   r)   r)   r*   drain_events  s   
zChannel.drain_eventsc                 C  s   t || js| j|| dS |S )z1Convert raw message to :class:`Message` instance.)r   rR   )
isinstancer   )r'   r
  r)   r)   r*   message_to_python  s   zChannel.message_to_pythonc                 C  s>   |pi }| di  | d|p| j ||||pi |pi dS )zPrepare message data.r   priority)r   r   r   r   r   )rA   default_priority)r'   r   r.  r   r   r   r   r)   r)   r*   prepare_message  s   zChannel.prepare_messagec                 C  r   )zEnable/disable message flow.

        Raises
        ------
            NotImplementedError: as flow
                is not implemented by the base virtual implementation.
        z%virtual channels do not support flow.r   )r'   activer)   r)   r*   flow  s   zChannel.flowc                 C  sp   | j s3d| _ t| jD ]}| | q| jr| j  | jdur(| j  d| _| jdur3| j	|  d| _
dS )zTClose channel.

        Cancel all consumers, and requeue unacked messages.
        TN)r   r   r   r  r   rZ   r   closer   close_channelr   )r'   consumerr)   r)   r*   r3    s   




zChannel.closec                 C  s"   |r| j |||fS ||fS r#   )codecsrg   r+   r'   r   encodingr)   r)   r*   r   $  s   zChannel.encode_bodyc                 C  s   |r| j ||S |S r#   )r6  rg   r-   r7  r)   r)   r*   r   )  s   zChannel.decode_bodyc                 C  s   t | j| jt| _d S r#   )r   r   r   r	   r   r?   r)   r)   r*   r  .  s   

zChannel._reset_cyclec                 C  s   | S r#   r)   r?   r)   r)   r*   	__enter__2  s   zChannel.__enter__exc_typetype[BaseException] | Noneexc_valBaseException | Noneexc_tbTracebackType | NonereturnNonec                 C  s   |    d S r#   )r3  )r'   r:  r<  r>  r)   r)   r*   __exit__5  s   zChannel.__exit__c                 C  s   | j jS )z/Broker state containing exchanges and bindings.)r   r   r?   r)   r)   r*   r   =  s   zChannel.statec                 C  s   | j du r| | | _ | j S )z&:class:`QoS` manager for this channel.N)r   rP   r?   r)   r)   r*   r	  B  s   
zChannel.qosc                 C  s   | j d u r	|   | j S r#   )r   r  r?   r)   r)   r*   r   I  s   
zChannel.cyclec              
   C  sV   zt tt|d d | j| j}W n tttfy!   | j}Y nw |r)| j| S |S )zGet priority from message.

        The value is limited to within a boundary of 0 to 9.

        Note:
        ----
            Higher value has more priority.
        r   r.  )	r`   minintmax_prioritymin_priority	TypeErrorr  rE   r/  )r'   rc   reverser.  r)   r)   r*   _get_message_priorityO  s   	
zChannel._get_message_priorityc                 C  s`   t | jj}td| jjd D ]}||vr | jj| |  S qtdt| jj	| jjd)Nr   z/No free channel ids, current={}, channel_max={})   r   )
r;   r   _used_channel_idsrangechannel_maxre   r   r   r\   channels)r'   used_channel_idsr   r)   r)   r*   r   c  s   
zChannel._get_free_channel_id)Nr   FFNFF)FF)NF)r   r   FN)Nr   Nr   )r   r   F)r   r#   )NN)NNNNN)T)r:  r;  r<  r=  r>  r?  r@  rA  )Ar/   r0   r1   r2   r   rP   r   r   r   r   r   r"   r6  r   r   _delivery_tagsr   r   r/  rF  rE  r=   r   r   r   r   r   r   r   r   r   r   r  r  r  r  r  r  r  r  r  r  r  r  r   r   r%  rr   rn   r+  r-  r0  r2  r3  r   r   r  r9  rB  propertyr   r	  r   rI  r   r)   r)   r)   r*   r     s    	






























r   c                      s0   e Zd ZdZ fddZdd Zdd Z  ZS )
Managementz'Base class for the AMQP management API.c                   s   t  | |j | _d S r#   )r   r=   r   rR   )r'   	transportr   r)   r*   r=   w  s   zManagement.__init__c                 C  s   dd | j  D S )Nc                 S  s   g | ]\}}}|||d qS ))r   r   r   r)   )rI   qerr)   r)   r*   rK   |  s    z+Management.get_bindings.<locals>.<listcomp>)rR   r   r?   r)   r)   r*   get_bindings{  s   zManagement.get_bindingsc                 C  s   | j   d S r#   )rR   r3  r?   r)   r)   r*   r3    r.   zManagement.close)r/   r0   r1   r2   r=   rW  r3  r   r)   r)   r   r*   rR  t  s
    rR  c                   @  s   e Zd ZdZeZeZeZdZdZ	dZ
dZdZdZejjjdeddgddZd	d
 Zdd Zdd Zdd Zdd ZdddZdd Zdd Zdd ZdddZedd ZdS ) 	Transportz|Virtual transport.

    Arguments:
    ---------
        client (kombu.Connection): The client this is a transport for.
    Ng      ?i  Fr   topic)asynchronousexchange_type
heartbeatsc                 K  s\   || _ t | _g | _g | _i | _| | j| jt| _	|j
d}|d ur'|| _tt| _d S )Npolling_interval)r   r8   r   rN  _avail_channelsr  Cycle_drain_channelr	   r   r   rg   r]  r   ARRAY_TYPE_HrK  )r'   r   r   r]  r)   r)   r*   r=     s   zTransport.__init__c                 C  s:   z| j  W S  ty   | |}| j| | Y S w r#   )r^  rH   
IndexErrorr   rN  re   )r'   r   rR   r)   r)   r*   create_channel  s   
zTransport.create_channelc                 C  sl   z1z	| j |j W n	 ty   Y nw z| j| W n	 ty%   Y nw W d |_d S W d |_d S d |_w r#   )rK  rF   r   r  rN  r   )r'   rR   r)   r)   r*   r4    s   
zTransport.close_channelc                 C  s   | j | |  | S r#   )r^  re   rc  r?   r)   r)   r*   establish_connection  s   zTransport.establish_connectionc              	   C  sP   | j   | j| jfD ]}|r%z| }W n	 ty   Y nw |  |sqd S r#   )r   r3  r^  rN  rH   LookupError)r'   r   	chan_listrR   r)   r)   r*   close_connection  s   
zTransport.close_connectionc                 C  s   t  }| jj}| j}|r|r||kr|}	 z
|| j|d W d S  ty?   |d ur5t  | |kr5t |d ur=t| Y nw q)Nr   r(  )	r
   r   rg   r]  r)  r	   socketr   r   )r'   r   r   
time_startrg   r]  r)   r)   r*   r+    s"   zTransport.drain_eventsc                 C  sX   |s	t d|z| j| }W n t y%   tt| | | Y d S w || d S )Nz.Received message without destination queue: {})rE   r   r  loggerwarningW_NO_CONSUMERS_reject_inbound_message)r'   rc   r   r   r)   r)   r*   r)    s   zTransport._deliverc                 C  sH   | j D ]}|r!|j||d}|j||j |j|jdd  d S qd S )Nr  Tr  )rN  r   r	  re   rd   r  )r'   r
  rR   rc   r)   r)   r*   rm    s   
z!Transport._reject_inbound_messagec                 C  s0   |r|| j vrtd||| j | | d S )Nz,Message for queue {!r} without consumers: {})r  rE   r   )r'   rR   rc   r   r)   r)   r*   on_message_ready  s   zTransport.on_message_readyc                 C  s   |j ||dS )N)r   r   )r+  )r'   rR   r   r   r)   r)   r*   r`  
  r.   zTransport._drain_channelc                 C  s   | j ddS )N	localhost)porthostname)default_portr?   r)   r)   r*   default_connection_params  s   z#Transport.default_connection_paramsr#   )r/   r0   r1   r2   r   r   r_  rR  r   rr  rN  r  r]  rM  r   rX  
implementsextend	frozensetr=   rc  r4  rd  rg  r+  r)  rm  rn  r`  rQ  rs  r)   r)   r)   r*   rX    s8    


rX  )Dr2   
__future__r   r$   rh  r~   r!  r   collectionsr   r   r   	itertoolsr   multiprocessing.utilr   r   r	   timer
   r   typingr   amqp.protocolr   kombu.exceptionsr   r   	kombu.logr   kombu.transportr   kombu.utils.divr   kombu.utils.encodingr   r   kombu.utils.schedulingr   kombu.utils.uuidr   r   r   typesr   ra  r#  r   rl  r   r   r/   rj  r   r    r"   	Exceptionr4   UserWarningr7   r8   rP   r   r   
StdChannelr   rR  rX  r)   r)   r)   r*   <module>   s^    


G #%B   R