o
    Df%                     @  s   d Z ddlmZ ddlZddlmZ ddlmZ ddlm	Z	 ddl
mZ dd	lmZ dd
lmZ ddlmZ dZG dd de	ZdS )zAmazon SQS Connection.    )annotationsN)
Serializer)	transform)AsyncAWSQueryConnection)
AWSRequest   )boto3)AsyncMessage)
AsyncQueue)AsyncSQSConnectionc                      s
  e Zd ZdZd7 fdd	Zdd Zdd	 Zd8d
dZ	d9ddZd:ddZ	dd Z
d;ddZd8ddZ			d<ddZd8ddZd8ddZ	d8d d!Z	d9d"d#Zd8d$d%Z	d8d&d'Zd8d(d)Zd=d+d,Zd8d-d.ZeZd/d0 Zd8d1d2Z	d8d3d4Zd8d5d6Z  ZS )>r   zAsync SQS Connection.r   Nc                   s.   t d u rtdt j|f||d| d S )Nzboto3 is not installed)region_namedebug)r   ImportErrorsuper__init__)selfsqs_connectionr   regionkwargs	__class__ ^/home/ubuntu/webapp/venv/lib/python3.10/site-packages/kombu/asynchronous/aws/sqs/connection.pyr      s   
zAsyncSQSConnection.__init__c                 C  sD   |  }|r
||d< d|i}| dkrd|i}td||d|S )NActiondatagetparamsmethodurlr   )copylowerr   )r   	operationr   	queue_urlr   param_payloadr   r   r   _create_query_request    s   z(AsyncSQSConnection._create_query_requestc                 C  s   |  }||d< | jjj}||}| jjj}i }|jd }d| }	|	|d< d|jd |j	}
|
|d< t
||d}|jd	tj}td||d
|S )NQueueUrljsonVersionzapplication/x-amz-json-zContent-Typez{}.{}targetPrefixzX-Amz-Target)r   headersr   r   r   )r    r   metaservice_modeloperation_model	_endpointhostmetadataformatnamejsondumpshttpr   r   DEFAULT_METHODr   )r   r"   r   r#   r+   r,   r   r)   json_versioncontent_typetargetr$   r   r   r   r   _create_json_request-   s0   




z'AsyncSQSConnection._create_json_requestc                 C  s   | j j}| j jj}|j}|dkr| ||||}	n|dkr%| |||}	ntd| d|	j	 dkr6dnd}
|j
||	|
d |	 }| j||d	S )
ai  
        Overide make_request to support different protocols.

        botocore is soon going to change the default protocol of communicating
        with SQS backend from 'query' to 'json', so we need a special
        implementation of make_request for SQS. More information on this can
        be found in: https://github.com/celery/kombu/pull/1807.
        queryr2   zUnsupported protocol: .r   zpresign-urlstandard)signing_typecallback)r   _request_signerr*   r+   protocolr%   r9   	Exceptionr   r!   signprepare_mexe)r   operation_namer   r#   verbr?   signerr+   rA   requestr=   prepared_requestr   r   r   make_requestO   s$   	
zAsyncSQSConnection.make_requestc                 C  s*   d|i}|rt |d|d< | jd||dS )N	QueueNamedDefaultVisibilityTimeoutCreateQueuer>   )r0   
get_object)r   
queue_namevisibility_timeoutr?   r   r   r   r   create_queuen   s   zAsyncSQSConnection.create_queueFc                 C  s   | j dd |j|dS )NDeleteQueuer>   
get_statusid)r   queueforce_deletionr?   r   r   r   delete_queuex   s   zAsyncSQSConnection.delete_queuec                 C  s   | j j|d}|d S )N)rL   r&   )r   get_queue_url)r   rX   resr   r   r   r[   |   s   z AsyncSQSConnection.get_queue_urlAllc                 C     | j dd|i|j|dS )NGetQueueAttributesAttributeNamer>   )rP   rW   )r   rX   	attributer?   r   r   r   get_queue_attributes   s   z'AsyncSQSConnection.get_queue_attributesc                 C     | j d||d|j|dS )NSetQueueAttribute)zAttribute.NamezAttribute.Valuer>   rU   )r   rX   ra   valuer?   r   r   r   set_queue_attribute   s
   z&AsyncSQSConnection.set_queue_attributer   ApproximateReceiveCountc                 C  sx   d|i}|r
||d< |r&i }	t |D ]\}
}||	dt|
d  < q||	 |d ur.||d< | jd|dtfg|||dS )	NMaxNumberOfMessagesVisibilityTimeoutzAttributeName.r   WaitTimeSecondsReceiveMessageMessage)r?   parent)	enumeratestrupdateget_listr	   )r   rX   r#   number_messagesrR   
attributeswait_time_secondsr?   r   attrsidxattrr   r   r   receive_message   s   
z"AsyncSQSConnection.receive_messagec                 C  s   |  |||S N)delete_message_from_handler   rX   receipt_handler?   r   r   r   delete_message   s   z!AsyncSQSConnection.delete_messagec                 C  s\   i }t |D ]\}}d|d  }|| d|j| d|ji q| jd||jd|dS )NzDeleteMessageBatchRequestEntry.r   .Id.ReceiptHandleDeleteMessageBatchPOSTrG   r?   ro   rq   rW   r}   rP   )r   rX   messagesr?   r   imprefixr   r   r   delete_message_batch   s   z'AsyncSQSConnection.delete_message_batchc                 C  s   | j dd|i||dS )NDeleteMessageReceiptHandler>   )rV   r|   r   r   r   r{      s   z-AsyncSQSConnection.delete_message_from_handlec                 C  s.   d|i}|rt ||d< | jd||jd|dS )NMessageBodyDelaySecondsSendMessager   r   )intrP   rW   )r   rX   message_contentdelay_secondsr?   r   r   r   r   send_message   s   zAsyncSQSConnection.send_messagec              
   C  sn   i }t |D ]%\}}d|d  }|| d|d | d|d | d|d i q| jd||jd	|d
S )NzSendMessageBatchRequestEntry.r   r   r   z.MessageBodyz.DelaySeconds   SendMessageBatchr   r   )ro   rq   rP   rW   )r   rX   r   r?   r   r   msgr   r   r   r   send_message_batch   s   z%AsyncSQSConnection.send_message_batchc                 C  rc   )NChangeMessageVisibility)r   rj   r>   rU   )r   rX   r}   rR   r?   r   r   r   change_message_visibility   s   z,AsyncSQSConnection.change_message_visibilityc              
   C  sr   i }t |D ]'\}}d|d  }|| d|d j| d|d j| d|d i q| jd||jd|d	S )
Nz)ChangeMessageVisibilityBatchRequestEntry.r   r   r   r   z.VisibilityTimeoutChangeMessageVisibilityBatchr   r   r   )r   rX   r   r?   r   r   tprer   r   r   change_message_visibility_batch   s   z2AsyncSQSConnection.change_message_visibility_batch c                 C  s(   i }|r||d< | j d|dtfg|dS )NQueueNamePrefix
ListQueuesr&   r>   )rr   r
   )r   r   r?   r   r   r   r   get_all_queues   s   z!AsyncSQSConnection.get_all_queuesc                 C  s   |  |t| j||S rz   )r   r   _on_queue_ready)r   rQ   r?   r   r   r   	get_queue   s   zAsyncSQSConnection.get_queuec                   s   t  fdd|D d S )Nc                 3  s     | ]}|j  r|V  qd S rz   )r   endswith).0qr1   r   r   	<genexpr>   s    z5AsyncSQSConnection._on_queue_ready.<locals>.<genexpr>)next)r   r1   queuesr   r   r   r      s   z"AsyncSQSConnection._on_queue_readyc                 C  s   | j dd|jidtfg|dS )NListDeadLetterSourceQueuesr&   r>   )rr   r   r
   )r   rX   r?   r   r   r   get_dead_letter_source_queues   s
   
z0AsyncSQSConnection.get_dead_letter_source_queuesc                 C  s   | j d|||d|j|dS )NAddPermission)LabelAWSAccountId
ActionNamer>   rU   )r   rX   labelaws_account_idaction_namer?   r   r   r   add_permission  s   z!AsyncSQSConnection.add_permissionc                 C  r^   )NRemovePermissionr   r>   rU   )r   rX   r   r?   r   r   r   remove_permission  s   z$AsyncSQSConnection.remove_permission)r   Nrz   )NN)FN)r]   N)r   Nrg   NN)r   N)__name__
__module____qualname____doc__r   r%   r9   rK   rS   rZ   r[   rb   rf   ry   r~   r   r{   r   r   r   r   r   r   lookupr   r   r   r   __classcell__r   r   r   r   r      sD    	
" 













	

	


r   )r   
__future__r   r2   botocore.serializer   viner   !kombu.asynchronous.aws.connectionr   kombu.asynchronous.aws.extr   extr   messager	   rX   r
   __all__r   r   r   r   r   <module>   s    