o
    Df(                  	   @   s   d Z ddlZddlmZ ddlmZ ddlmZ ddlm	Z	m
Z
 dddhd	d
ddddZG dd dZG dd dee	jZddddeddfddZedd ZedddZdS )z-Create Celery app instances used for testing.    N)contextmanager)deepcopy)symbol_by_name)Celery_stateFjsonTUTCz	memory://zcache+memory://)worker_hijack_root_loggerworker_log_coloraccept_content
enable_utctimezone
broker_urlresult_backendbroker_heartbeatc                   @   s   e Zd ZdZdd ZdS )TrapzTrap that pretends to be an app but raises an exception instead.

    This to protect from code that does not properly pass app instances,
    then falls back to the current_app.
    c                 C   s$   |dks|dkr
d S t | td)N_is_coroutine__func__zTest depends on current_app)printRuntimeError)selfname r   S/home/ubuntu/webapp/venv/lib/python3.10/site-packages/celery/contrib/testing/app.py__getattr__   s   zTrap.__getattr__N)__name__
__module____qualname____doc__r   r   r   r   r   r      s    r   c                       s    e Zd ZdZ fddZ  ZS )UnitLoggingz)Sets up logging for the test application.c                    s   t  j|i | d| _d S )NT)super__init__already_setup)r   argskwargs	__class__r   r   r!   *   s   
zUnitLogging.__init__)r   r   r   r   r!   __classcell__r   r   r%   r   r   '   s    r   c           
      K   s   ddl m} tttfi |pi }|dur|dd |dur&|dd |r*dn|}t| p0df||||d|}	|	| |	S )zApp used for testing.   )tasksNr   r   zcelery.tests)set_as_currentlogbrokerbackend) r)   dictr   DEFAULT_TEST_CONFIGpopr   add_defaults)
r   configenable_loggingr*   r+   r-   r,   r$   r)   test_appr   r   r   TestApp/   s&   
r6   c                 #   sN    t   tj}t  G  fddd}| t_z	dV  W |t_dS |t_w )zContextmanager that installs the trap app.

    The trap means that anything trying to use the current or default app
    will raise an exception.
    c                       s   e Zd Z ZdS )zset_trap.<locals>.NonTLSN)r   r   r   current_appr   trapr   r   NonTLSO   s    r:   N)r   r   _tlsset_default_app)appprev_tlsr:   r   r8   r   set_trapD   s   
r?   c              	   c   s    t  }t j}tt j}tt j}z7|r.t|  dV  W d   n1 s(w   Y  ndV  W t 	| |t j
_| |urC|   |t _|t _dS t 	| |t j
_| |ur\|   |t _|t _w )zWSetup default app for testing.

    Ensures state is clean after the test returns.
    N)r   get_current_appdefault_appset_on_app_finalizersweakrefWeakSet_appsr?   r<   r;   r7   close)r=   use_trapprev_current_appprev_default_appprev_finalizers	prev_appsr   r   r   setup_default_appY   s2   




rM   )F)r   rD   
contextlibr   copyr   kombu.utils.importsr   celeryr   r   r0   r   log_clsr   r6   r?   rM   r   r   r   r   <module>   s0    

