o
    Df4                     @  sZ  d Z ddlmZ ddlZddlZddlZddlmZ ddlmZm	Z	m
Z
mZmZ ddlmZmZ ddlmZ ddlmZ dd	lmZmZ dd
lmZmZ ddlmZ ddlmZ ddlmZm Z  ddl!m"Z" ddl#m$Z$m%Z%m&Z&m'Z'm(Z(m)Z)m*Z*m+Z+m,Z,m-Z-m.Z.m/Z/m0Z0 dgZ1d$ddZ2d%ddZ3G dd dZ4G dd  d Z5d&d!d"Z6G d#d dZ7dS )'z
Telnet server.
    )annotationsN)get_running_loop)AnyCallable	CoroutineTextIOcast)create_app_sessionget_app)run_in_terminal)Size)AnyFormattedTextto_formatted_text)	PipeInputcreate_pipe_input)Vt100_Output)print_formatted_text)	BaseStyle
DummyStyle   )logger)DOECHOIACLINEMODEMODENAWSSBSESENDSUPPRESS_GO_AHEADTTYPEWILLTelnetProtocolParserTelnetServernumberintreturnbytesc                 C  s
   t | fS N)r(   )r%    r*   ]/home/ubuntu/webapp/venv/lib/python3.10/site-packages/prompt_toolkit/contrib/telnet/server.pyint2byte,      
r,   
connectionsocket.socketNonec                 C  s   t d | tt t  | tt t  | tt t t	 t
d t t  | tt t  | tt t  | tt t  | tt t t t t  d S )NzInitializing telnet connectionr   )r   infosendr   r   r   r"   r    r   r   r,   r   r   r   r!   r   r.   r*   r*   r+   _initialize_telnet0   s   
&"r4   c                   @  s^   e Zd ZdZddd	ZdddZdddZdddZdddZe	dddZ
e	dddZdS )_ConnectionStdoutzk
    Wrapper around socket which provides `write` and `flush` methods for the
    Vt100_Output output.
    r.   r/   encodingstrr'   r0   c                 C  s"   || _ || _d| _g | _d| _d S )NstrictF)	_encoding_connection_errors_buffer_closed)selfr.   r6   r*   r*   r+   __init__S   s
   
z_ConnectionStdout.__init__datac                 C  s2   | dd}| j|j| j| jd |   d S )N
z
)errors)replacer<   appendencoder9   r;   flushr>   r@   r*   r*   r+   writeZ   s   z_ConnectionStdout.writeboolc                 C  s   dS NTr*   r>   r*   r*   r+   isatty_   s   z_ConnectionStdout.isattyc              
   C  s^   z| j s| jd| j W n ty) } ztd|  W Y d }~nd }~ww g | _d S )N    z Couldn't send data over socket: )r=   r:   r2   joinr<   OSErrorr   warning)r>   er*   r*   r+   rF   b   s   
z_ConnectionStdout.flushc                 C  s
   d| _ d S rJ   )r=   rK   r*   r*   r+   closek   r-   z_ConnectionStdout.closec                 C     | j S r)   )r9   rK   r*   r*   r+   r6   n      z_ConnectionStdout.encodingc                 C  rS   r)   )r;   rK   r*   r*   r+   rB   r   rT   z_ConnectionStdout.errorsN)r.   r/   r6   r7   r'   r0   )r@   r7   r'   r0   )r'   rI   r'   r0   )r'   r7   )__name__
__module____qualname____doc__r?   rH   rL   rF   rR   propertyr6   rB   r*   r*   r*   r+   r5   M   s    




	r5   c                   @  sd   e Zd ZdZ	d,d-ddZd.ddZd/ddZd.ddZd0d!d"Zd0d#d$Z	d1d'd(Z
d.d)d*Zd+S )2TelnetConnectionz6
    Class that represents one Telnet connection.
    Tconnr/   addrtuple[str, int]interact7Callable[[TelnetConnection], Coroutine[Any, Any, None]]serverr$   r6   r7   styleBaseStyle | Nonevt100_inputr   
enable_cprrI   r'   r0   c	                   s   |_ |_|_|_|_|_d_t _	|_
 _d _tddd_t| dfddttt||d	_dfdd}	dfdd}
d fdd}t|	|
|_d _d S )NF(   O   rowscolumnsr'   r   c                     s    j S r)   )sizer*   rK   r*   r+   get_size   s   z+TelnetConnection.__init__.<locals>.get_size)r6   r@   r(   r0   c                   s    j |  dS )z-TelnetProtocolParser 'data_received' callbackN)rd   
send_bytesr@   rK   r*   r+   data_received   s   z0TelnetConnection.__init__.<locals>.data_receivedri   r&   rj   c                   s:   t | |d _ jdur jr jdd  dS dS dS )z-TelnetProtocolParser 'size_received' callbackrh   Nc                   S  s
   t   S r)   )r
   
_on_resizer*   r*   r*   r+   <lambda>      
 zBTelnetConnection.__init__.<locals>.size_received.<locals>.<lambda>)r   rk   vt100_outputcontextrunrh   rK   r*   r+   size_received   s   z0TelnetConnection.__init__.<locals>.size_receivedttyper7   c                   s"   t j|  d_j  dS )z.TelnetProtocolParser 'ttype_received' callback)termre   N)r   stdoutrs   _readyset)rw   re   rl   r>   r*   r+   ttype_received   s   
z1TelnetConnection.__init__.<locals>.ttype_received)r'   r   r@   r(   r'   r0   )ri   r&   rj   r&   r'   r0   )rw   r7   r'   r0   )r\   r]   r_   ra   r6   rb   r=   asyncioEventrz   rd   re   rs   r   rk   r4   r   r   r5   ry   r#   parserrt   )r>   r\   r]   r_   ra   r6   rb   rd   re   ro   rv   r}   r*   r|   r+   r?   |   s(   

zTelnetConnection.__init__c                   s   d fdd}t  }| j| z; j I dH  t j jd t	  _
  I dH  W d   n1 s;w   Y  W    dS W    dS    w )z"
        Run application.
        r'   r0   c                    s<    j d} | r |  d S tdj j     d S )Ni   z&Connection closed by client. {!r} {!r})r\   recvfeedr   r1   formatr]   rR   rn   rK   r*   r+   handle_incoming_data   s
   z>TelnetConnection.run_application.<locals>.handle_incoming_dataN)inputoutputrU   )r   
add_readerr\   rz   waitr	   rd   rs   contextvarscopy_contextrt   r_   rR   )r>   r   loopr*   rK   r+   run_application   s   

z TelnetConnection.run_applicationr@   r(   c                 C  s   | j | dS )zF
        Handler for incoming data. (Called by TelnetServer.)
        N)r   r   rG   r*   r*   r+   r      s   zTelnetConnection.feedc                 C  s@   | j sd| _ | j  t | j | j  | j  dS dS )z#
        Closed by client.
        TN)r=   rd   rR   r   remove_readerr\   ry   rK   r*   r*   r+   rR      s   

zTelnetConnection.closeformatted_textr   c                 C  s0   | j du rdS t|}t| j || jpt  dS )z*
        Send text to the client.
        N)rs   r   r   rb   r   r>   r   r*   r*   r+   r2      s   
zTelnetConnection.sendc                   s   t    fddS )z\
        Send text to the client.
        This is asynchronous, returns a `Future`.
        c                     s
     S r)   )r2   r*   r   r>   r*   r+   rq      rr   z4TelnetConnection.send_above_prompt.<locals>.<lambda>)r   _run_in_terminalr   r*   r   r+   send_above_prompt   s   z"TelnetConnection.send_above_promptfuncCallable[[], None]c                 C  s    | j r| j t| d S td)Nz2Called _run_in_terminal outside `run_application`.)rt   ru   r   RuntimeError)r>   r   r*   r*   r+   r      s   z!TelnetConnection._run_in_terminalc                 C  s4   | j du rdS | j   | j dd | j   dS )zB
        Erase the screen and move the cursor to the top.
        Nr   )rs   erase_screencursor_gotorF   rK   r*   r*   r+   r      s
   

zTelnetConnection.erase_screenN)T)r\   r/   r]   r^   r_   r`   ra   r$   r6   r7   rb   rc   rd   r   re   rI   r'   r0   rU   r~   )r   r   r'   r0   )r   r   r'   r0   )rV   rW   rX   rY   r?   r   r   rR   r2   r   r   r   r*   r*   r*   r+   r[   w   s    
7




r[   c                   s   d S r)   r*   r3   r*   r*   r+   _dummy_interact  s   r   c                   @  s`   e Zd ZdZddedddfd$ddZed%ddZd&d'ddZd(ddZ	d(dd Z
d)d"d#ZdS )*r$   a  
    Telnet server implementation.

    Example::

        async def interact(connection):
            connection.send("Welcome")
            session = PromptSession()
            result = await session.prompt_async(message="Say something: ")
            connection.send(f"You said: {result}
")

        async def main():
            server = TelnetServer(interact=interact, port=2323)
            await server.run()
    z	127.0.0.1   zutf-8NThostr7   portr&   r_   r`   r6   rb   rc   re   rI   r'   r0   c                 C  s<   || _ || _|| _|| _|| _|| _d | _g | _t | _	d S r)   )
r   r   r_   r6   rb   re   	_run_task_application_tasksr{   connections)r>   r   r   r_   r6   rb   re   r*   r*   r+   r?     s   zTelnetServer.__init__r/   c                 C  s>   t  t jt j}|t jt jd |||f |d |S )Nr      )socketAF_INETSOCK_STREAM
setsockopt
SOL_SOCKETSO_REUSEADDRbindlisten)clsr   r   sr*   r*   r+   _create_socket2  s
   
zTelnetServer._create_socketready_cbCallable[[], None] | Nonec                   s     j jtd j j t  fdd |r#|  z4t I dH  W t 	 
   jD ]}|  q9t jdkrVtj jdtjdI dH  dS dS t 	 
   jD ]}|  qet jdkrtj jdtjdI dH  w w )z
        Run the telnet server, until this gets cancelled.

        :param ready_cb: Callback that will be called at the point that we're
            actually listening.
        z.Listening for telnet connections on %s port %rc                     s
     S r)   )_acceptr*   r>   r   r*   r+   rq   H  rr   z"TelnetServer.run.<locals>.<lambda>Nr   )timeoutreturn_when)r   r   r   r   r1   r   r   r   Futurer   rR   r   cancellenr   ALL_COMPLETED)r>   r   tr*   r   r+   ru   <  s@   




zTelnetServer.runc                 C  s$   | j durdS t |  | _ dS )z}
        Deprecated: Use `.run()` instead.

        Start the telnet server (stop by calling and awaiting `stop()`).
        N)r   r   create_taskru   rK   r*   r*   r+   startc  s   
zTelnetServer.startc                   sD   | j dur | j   z	| j I dH  W dS  tjy   Y dS w dS )z
        Deprecated: Use `.run()` instead.

        Stop a telnet server that was started using `.start()` and wait for the
        cancellation to complete.
        N)r   r   r   CancelledErrorrK   r*   r*   r+   stopo  s   

zTelnetServer.stoplisten_socketc                   sP   |  \ tjdg R   d fdd}t | j dS )z1
        Accept new incoming connection.
        zNew connection %r %rr'   r0   c                    s  zz]t  P} t jjj| jd}j| tj	dg R   z|
 I d H  W j| tj	dg R   nj| tj	dg R   w W d    n1 sYw   Y  W n? tym   t	d Y n; tyz   t	d Y n7 ty } ztdt|j | dd l}|  W Y d }~n d }~ww W j d S W j d S W j d S W j d S j w )N)r6   rb   rd   re   zStarting interaction %r %rzStopping interaction %r %rz)Unhandled EOFError in telnet application.z2Unhandled KeyboardInterrupt in telnet application.zGot r   )r   r[   r_   r6   rb   re   r   addr   r1   r   removeEOFErrorKeyboardInterruptBaseExceptionprinttyperV   	traceback	print_excr   )rd   r.   rQ   r   r]   r\   r>   taskr*   r+   ru     sR   

z!TelnetServer._accept.<locals>.runNrU   )acceptr   r1   r   r   r   rD   )r>   r   ru   r*   r   r+   r   }  s
   &zTelnetServer._accept)r   r7   r   r&   r_   r`   r6   r7   rb   rc   re   rI   r'   r0   )r   r7   r   r&   r'   r/   r)   )r   r   r'   r0   rU   )r   r/   r'   r0   )rV   rW   rX   rY   r   r?   classmethodr   ru   r   r   r   r*   r*   r*   r+   r$   
  s    	
'
)r%   r&   r'   r(   )r.   r/   r'   r0   )r.   r[   r'   r0   )8rY   
__future__r   r   r   r   r   typingr   r   r   r   r   "prompt_toolkit.application.currentr	   r
   *prompt_toolkit.application.run_in_terminalr   prompt_toolkit.data_structuresr   prompt_toolkit.formatted_textr   r   prompt_toolkit.inputr   r   prompt_toolkit.output.vt100r   prompt_toolkit.rendererr   prompt_toolkit.stylesr   r   logr   protocolr   r   r   r   r   r   r   r   r   r    r!   r"   r#   __all__r,   r4   r5   r[   r   r$   r*   r*   r*   r+   <module>   s4    <

* 
