o
    DfV                     @   s   d Z ddlZddlmZmZ ddlmZmZmZ ddlm	Z	 ddl
mZ ddlmZ 								dd	d
Z						dddZdefddZG dd de	Zdd ZdS )zUseful mocks for unit testing.    N)datetime	timedelta)AnyMappingSequence)Mock)Celery)	Signature c	                 K   s   |si n|}ddl m}
 ddlm} |p| }td| d}|| |d|_|||d}|j|	 |
|||fdd	\|_|_|_	|||f|_
|S )
z)Create task message in protocol 2 format.r   dumpsuuidTaskMessage-name)idtaskshadow)	callbackserrbackschainjson)
serializer)kombu.serializationr   celeryr   r   headersupdatecontent_typecontent_encodingbodypayload)r   r   argskwargsr   r   r   r   utcoptionsr   r   messageembedr
   r
   U/home/ubuntu/webapp/venv/lib/python3.10/site-packages/celery/contrib/testing/mocks.pyTaskMessage   s    

r)   c                 K   s~   |si n|}ddl m} ddlm}	 |p|	 }td| d}
i |
_| |||||d|
_|
j| ||
j\|
_|
_	|
_
|
S )z)Create task message in protocol 1 format.r   r   r   r   r   )r   r   r"   r#   r   r   )r   r   r   r   r   r   r!   r   r   r   r    )r   r   r"   r#   r   r   r   r%   r   r   r&   r
   r
   r(   TaskMessage1-   s$   
r*   Tc           	      C   s  |   |jdd}|jdd}|jdd}|r%|  t|d }n|jdd}|r7t|tr7| }|jdd}|rOt|tj	rO|  t|d }|rZt|trZ| }||j
f|j|j|j|rmdd	 |D nd|rwd
d	 |D nd|||d|jS )zCreate task message from :class:`celery.Signature`.

    Example:
        >>> m = task_message_from_sig(app, add.s(2, 2))
        >>> amqp_client.basic_publish(m, exchange='ex', routing_key='rkey')
    linkN
link_error	countdown)secondsetaexpiresc                 S      g | ]}t |qS r
   dict.0sr
   r
   r(   
<listcomp>i       z)task_message_from_sig.<locals>.<listcomp>c                 S   r1   r
   r2   r4   r
   r
   r(   r7   j   r8   )r   r"   r#   r   r   r/   r0   r$   )freezer%   popnowr   
isinstancer   	isoformatnumbersRealr   r   r"   r#   )	appsigr$   r)   r   r   r-   r/   r0   r
   r
   r(   task_message_from_sigO   s6   rB   c                   @   s    e Zd ZdZdd Zdd ZdS )_ContextMockzDummy class implementing __enter__ and __exit__.

    The :keyword:`with` statement requires these to be implemented
    in the class, not just the instance.
    c                 C   s   | S Nr
   )selfr
   r
   r(   	__enter__y      z_ContextMock.__enter__c                 G   s   d S rD   r
   )rE   exc_infor
   r
   r(   __exit__|   rG   z_ContextMock.__exit__N)__name__
__module____qualname____doc__rF   rI   r
   r
   r
   r(   rC   r   s    rC   c                  O   s>   t | i |}|t  d |t  d ||j_d|j_|S )z3Mock that mocks :keyword:`with` statement contexts.rF   rI   N)rC   attach_mockrF   return_valuerI   )r"   r#   objr
   r
   r(   ContextMock   s   rQ   )Nr
   NNNNNN)Nr
   NNNN)rM   r>   r   r   typingr   r   r   unittest.mockr   r   r   celery.canvasr	   r)   r*   rB   rC   rQ   r
   r
   r
   r(   <module>   s4    
$
"#