
     iDD                         S r SSKrSSKrSSKrSSKrSSKrSSKrSSKJr  SSK	J
r
  SSKJr  SSKJrJrJrJr  SSKJrJrJrJrJr   " S	 S
5      rg)a	  
Dirty Worker Process

Asyncio-based worker that loads dirty apps and handles requests
from the DirtyArbiter.

Threading Model
---------------
Each dirty worker runs an asyncio event loop in the main thread for:
- Handling connections from the arbiter
- Managing heartbeat updates
- Coordinating task execution

Actual app execution runs in a ThreadPoolExecutor (separate threads):
- The number of threads is controlled by ``dirty_threads`` config (default: 1)
- Each thread can execute one app action at a time
- The asyncio event loop is NOT blocked by task execution

State and Global Objects
------------------------
Apps can maintain persistent state because:

1. Apps are loaded ONCE when the worker starts (in ``load_apps()``)
2. The same app instances are reused for ALL requests
3. App state (instance variables, loaded models, etc.) persists

Example::

    class MLApp(DirtyApp):
        def init(self):
            self.model = load_heavy_model()  # Loaded once, reused
            self.cache = {}                   # Persistent cache

        def predict(self, data):
            return self.model.predict(data)  # Uses loaded model

Thread Safety:
- With ``dirty_threads=1`` (default): No concurrent access, thread-safe by design
- With ``dirty_threads > 1``: Multiple threads share the same app instances,
  apps MUST be thread-safe (use locks, thread-local storage, etc.)

Heartbeat and Liveness
----------------------
The worker sends heartbeat updates to prove it's alive:

1. A dedicated asyncio task (``_heartbeat_loop``) runs independently
2. It updates the heartbeat file every ``dirty_timeout / 2`` seconds
3. Since tasks run in executor threads, they do NOT block heartbeats
4. The arbiter kills workers that miss heartbeat updates

Timeout Control
---------------
Execution timeout is enforced at two levels:

1. **Worker level**: Each task execution has a timeout (``dirty_timeout``).
   If exceeded, the worker returns a timeout error but the thread may
   continue running (Python threads cannot be cancelled).

2. **Arbiter level**: The arbiter also enforces timeout when waiting
   for worker response. Workers that don't respond are killed via SIGABRT.

Note: Since Python threads cannot be forcibly cancelled, a truly stuck
operation will continue until the worker is killed by the arbiter.
    N)util)	WorkerTmp   )load_dirty_apps)DirtyAppErrorDirtyAppNotFoundErrorDirtyTimeoutErrorDirtyWorkerError)DirtyProtocolmake_responsemake_error_responsemake_chunk_messagemake_end_messagec                       \ rS rSrSrSR                  5        V VVVs/ s H  n[        [        SU-  5      PM     snnnn rS r	S r
S rS rS	 rS
 rS rS rS rS rS rS rS rS rS rS rS rSrgs  snnnn f )DirtyWorker`   z
Dirty worker process that loads dirty apps and handles requests.

Each worker runs its own asyncio event loop and listens on a
worker-specific Unix socket for requests from the DirtyArbiter.
zABRT HUP QUIT INT TERM USR1zSIG%sc                     Xl         SU l        X l        X0l        X@l        XPl        X`l        SU l        SU l        SU l	        [        U5      U l        0 U l        SU l        SU l        SU l        g)z
Initialize a dirty worker.

Args:
    age: Worker age (for identifying workers)
    ppid: Parent process ID
    app_paths: List of dirty app import paths
    cfg: Gunicorn config
    log: Logger
    socket_path: Path to this worker's Unix socket
z	[booting]FTN)agepidppid	app_pathscfglogsocket_pathbootedabortedaliver   tmpapps_server_loop	_executor)selfr   r   r   r   r   r   s          G/var/www/ias/venv/lib/python3.13/site-packages/gunicorn/dirty/worker.py__init__DirtyWorker.__init__k   sf     	"&
S>	
    c                 "    SU R                    S3$ )Nz<DirtyWorker >)r   r#   s    r$   __str__DirtyWorker.__str__   s    txxj**r'   c                 8    U R                   R                  5         g)zUpdate heartbeat timestamp.N)r   notifyr*   s    r$   r.   DirtyWorker.notify   s    r'   c                    U R                   R                  (       a@  U R                   R                  R                  5        H  u  pU[        R                  U'   M     [
        R                  " U R                   R                  U R                   R                  U R                   R                  S9  [
        R                  " 5         [
        R                  " U R                  R                  5       5        U R                  R                  5         U R                  5         U R!                  5         [        R"                  " 5       U l        U R                   R'                  U 5        SU l        U R+                  5         g)z
Initialize the worker process after fork.

This is called in the child process after fork. It sets up
the environment, loads apps, and starts the main run loop.
)
initgroupsTN)r   envitemsosenvironr   set_owner_processuidgidr1   seedclose_on_execr   filenor   init_signals	load_appsgetpidr   dirty_worker_initr   run)r#   kvs      r$   init_processDirtyWorker.init_process   s     88<<**, !

1 - 	txx||TXX\\*.((*=*=	? 			 	488??,-  	 	 99;""4( 
r'   c                 J   U R                    H(  n[        R                  " U[        R                  5        M*     [        R                  " [        R                  U R                  5        [        R                  " [        R
                  U R                  5        [        R                  " [        R                  U R                  5        [        R                  " [        R                  U R                  5        [        R                  " [        R                  U R                  5        g)zSet up signal handlers.N)	SIGNALSsignalSIG_DFLSIGTERM_signal_handlerSIGQUITSIGINTSIGABRTSIGUSR1)r#   sigs     r$   r<   DirtyWorker.init_signals   s     <<CMM#v~~.   	fnnd&:&:;fnnd&:&:;fmmT%9%9: 	fnnd&:&:; 	fnnd&:&:;r'   c                     U[         R                  :X  a  U R                  R                  5         gSU l        U R
                  (       a&  U R
                  R                  U R                  5        gg)z(Handle signals by setting alive = False.NF)rG   rN   r   reopen_filesr   r!   call_soon_threadsafe	_shutdown)r#   rO   frames      r$   rJ   DirtyWorker._signal_handler   sJ    &.. HH!!#
::JJ++DNN; r'   c                 \    U R                   (       a  U R                   R                  5         gg)zInitiate async shutdown.N)r    closer*   s    r$   rT   DirtyWorker._shutdown   s    <<LL  r'   c                     [        U R                  5      U l        U R                  R                  5        HN  u  pU R                  R                  SU5         UR                  5         U R                  R                  SU5        MP     g! [         a"  nU R                  R                  SX5        e SnAff = f! [         a"  nU R                  R                  SU5        e SnAff = f)zLoad all configured dirty apps.zLoaded dirty app: %szInitialized dirty app: %sz%Failed to initialize dirty app %s: %sNzFailed to load dirty apps: %s)
r   r   r   r3   r   debuginitinfo	Exceptionerrorr#   pathappes       r$   r=   DirtyWorker.load_apps   s    	'7DI!YY__.	5t<HHJHHMM"=tD	 /
 ! HHNN#J#',  	HHNN:A>	s<   AB8 ,B	B8 	
B5B00B55B8 8
C$CC$c                     SSK Jn  U R                  R                  nU" USU R                   S3S9U l        U R                  R                  SU5         [        R                  " 5       U l
        [        R                  " U R                  5        U R                  R                  U R                  5       5        U R!                  5         g! [         a&  nU R                  R                  SU5         SnAN<SnAff = f! U R!                  5         f = f)	z Run the main asyncio event loop.r   )ThreadPoolExecutorzdirty-worker--)max_workersthread_name_prefixz#Created thread pool with %d threadszWorker error: %sN)concurrent.futuresrf   r   dirty_threadsr   r"   r   r[   asyncionew_event_loopr!   set_event_looprun_until_complete
_run_asyncr^   r_   _cleanup)r#   rf   num_threadsrc   s       r$   r@   DirtyWorker.run   s     	: hh,,+#!.txxj:
 	<kJ	 //1DJ""4::.JJ))$//*;< MMO  	2HHNN-q11	2 MMOs+   A#C 
C8C3.C; 3C88C; ;Dc                   #    [         R                  R                  U R                  5      (       a   [         R                  " U R                  5        [
        R                  " U R                  U R                  S9I Sh  vN U l        [         R                  " U R                  S5        U R                  R                  SU R                  U R                  5        [
        R                  " U R                  5       5      n U R                   ISh  vN   U R                  R                  5       I Sh  vN   SSS5      ISh  vN   UR#                  5          UI Sh  vN   g N NT N4 N&! , ISh  vN  (       d  f       N;= f! [
        R                    a     NTf = f NA! [
        R                    a     gf = f! UR#                  5          UI Sh  vN    f ! [
        R                    a     f f = f= f7f)z6Main async loop - start server and handle connections.)ra   Ni  zDirty worker %s listening on %s)r4   ra   existsr   unlinkrl   start_unix_serverhandle_connectionr    chmodr   r]   r   create_task_heartbeat_loopserve_foreverCancelledErrorcancel)r#   heartbeat_tasks     r$   rp   DirtyWorker._run_async   sw     77>>$**++IId&&' %66""!!
 
 	!!5)7hh 0 0	2 !,,T-A-A-CD
	|||ll00222 $|
 !!#$$$-
 $2 $|||%% 		
 %))  !!#$$$)) s  A;G4=E!>A?G4>F E#F E)0E%1E)5F  E'F G4F FF  G4#F %E)'F )F /E20F <F ?F9  F FF9 FF9 F F63G45F66G49G1GGGG1G.+G1-G..G11G4c                    #    U R                   (       aY  U R                  5         [        R                  " U R                  R
                  S-  5      I Sh  vN   U R                   (       a  MX  gg N7f)zPeriodically update heartbeat.g       @N)r   r.   rl   sleepr   dirty_timeoutr*   s    r$   r{   DirtyWorker._heartbeat_loop  sD     jjKKM-- 6 6 <=== jjj=s   AA0A.A0,A0c                   #    U R                   R                  S5         U R                  (       aK   [        R                  " U5      I Sh  vN nU R                  X25      I Sh  vN   U R                  (       a  MK  UR                  5          UR                  5       I Sh  vN   g NZ! [
        R                   a     MC  f = f N^! [         a&  nU R                   R                  SU5         SnANtSnAff = f NW! [         a     gf = f! UR                  5          UR                  5       I Sh  vN    f ! [         a     f f = f= f7f)zT
Handle a connection from the arbiter.

Each connection can send multiple requests.
zNew connection from arbiterNzConnection error: %s)r   r[   r   r   read_message_asyncrl   IncompleteReadErrorhandle_requestr^   r_   rX   wait_closed)r#   readerwritermessagerc   s        r$   rx   DirtyWorker.handle_connection   s     	45	**$1$D$DV$LLG ))'::: *** LLN((*** M22 
 ; 	6HHNN1155	6
 +  LLN((*** s   EC B' 
B%B' C #C$C ;EC9 C7 C9 $E%B' 'B?;C <D	 >B??C 
C4C/*D	 /C44D	 7C9 9
DEDE	ED5.D1/D54E5
E?EEEEc                 N  #    UR                  S[        [        R                  " 5       5      5      nUR                  S5      nU[        R
                  :w  a7  [        U[        SU 35      5      n[        R                  " X%5      I Sh  vN   gUR                  S5      nUR                  S5      nUR                  S/ 5      nUR                  S0 5      n	U R                  5          U R                  XgX5      I Sh  vN n
[        R                  " U
5      (       a  U R                  X:U5      I Sh  vN   g[        R                  " U
5      (       a  U R                  X:U5      I Sh  vN   g[!        X:5      n[        R                  " X%5      I Sh  vN   g GN N Nj N6 N! ["         ay  n[$        R&                  " 5       nU R(                  R+                  S	XgX5        [        U[-        [        U5      XgUS
95      n[        R                  " X%5      I Sh  vN     SnAgSnAff = f7f)a)  
Handle a single request message.

Supports both regular (non-streaming) and streaming responses.
For streaming, detects if the result is a generator and sends
chunk messages followed by an end message.

Args:
    message: Request dict from protocol
    writer: StreamWriter for sending responses
idtypezUnknown message type: Napp_pathactionargskwargszError executing %s.%s: %s
%s)r   r   	traceback)getstruuiduuid4r   MSG_TYPE_REQUESTr   r
   write_message_asyncr.   executeinspectisgenerator_stream_sync_generator
isasyncgen_stream_async_generatorr   r^   r   
format_excr   r_   r   )r#   r   r   
request_idmsg_typeresponser   r   r   r   resultrc   tbs                r$   r   DirtyWorker.handle_request;  s     [[s4::<'89
;;v&}555* #9(!DEH  33FEEE;;z*X&{{62&Xr* 		F<<$GGF ""6**11*fMMM##F++22:vNNN )<#77III- F H NN J 		F%%'BHHNN:#Q4*c!fx(*,H
  33FEEE		Fs   BH%FAH%$F 9F:4F .F/F 3H%40F $F%F )H%*$F FF H%F F F F 
H")A)HHHH%H""H%c           	        ^^
#    [        5       m
U
U4S jn [        R                  " 5       n UR                  U R                  U5      I Sh  vN nUT
L a  O:[
        R                  " U[        X5      5      I Sh  vN   U R                  5         Md  [
        R                  " U[        U5      5      I Sh  vN   TR%                  5         g N} NQ N! [         aw  n[        R                  " 5       nU R                  R                  SXx5        [        U[!        [#        U5      US95      n	[
        R                  " X95      I Sh  vN     SnANSnAff = f! TR%                  5         f = f7f)z
Stream chunks from a synchronous generator.

Args:
    request_id: Request ID for the messages
    gen: Sync generator to iterate
    writer: StreamWriter for sending messages
c                  @   >  [        T5      $ ! [         a    T s $ f = fN)nextStopIteration)
_EXHAUSTEDgens   r$   	_get_next5DirtyWorker._stream_sync_generator.<locals>._get_next~  s'    "Cy   "!!"s   
 NError during streaming: %s
%sr   )objectrl   get_running_looprun_in_executorr"   r   r   r   r.   r   r^   r   r   r   r_   r   r   r   rX   )r#   r   r   r   r   loopchunkrc   r   r   r   s     `       @r$   r   "DirtyWorker._stream_sync_generatorq  s2     X
	"	++-D"224>>9MMJ&#77.zA      33(4   IIK/ N  	F%%'BHHNN;QC*c!f3H  33FEEE	F IIKs}   E)5C C
-C :C;9C 4C5C 9E)
C C C 
EA'EEEE EE E&&E)c           	      l  #     U  Sh  vN n[         R                  " U[        X5      5      I Sh  vN   U R                  5         MC   N> N
 [         R                  " U[	        U5      5      I Sh  vN    O! [
         aw  n[        R                  " 5       nU R                  R                  SXV5        [        U[        [        U5      US95      n[         R                  " X75      I Sh  vN     SnAOSnAff = fUR                  5       I Sh  vN    g! UR                  5       I Sh  vN    f = f7f)z
Stream chunks from an asynchronous generator.

Args:
    request_id: Request ID for the messages
    gen: Async generator to iterate
    writer: StreamWriter for sending messages
Nr   r   )r   r   r   r.   r   r^   r   r   r   r_   r   r   r   aclose)r#   r   r   r   r   rc   r   r   s           r$   r   #DirtyWorker._stream_async_generator  s     	" e#77.zA     #  33(4    	F%%'BHHNN;QC*c!f3H  33FEEE	F **,#**,s   D4A8 AA	A$A8 AA8 	AA8 $A8 1A42A8 7D 8
C9A'C4)C,*C4/D 4C99D <D4DD4D1*D-+D11D4c           	        ^^^^#    XR                   ;  a  [        U5      eU R                   U   mU R                  R                  S:  a  U R                  R                  OSn[        R
                  " 5       n [        R                  " UR                  U R                  UUUU4S j5      US9I Sh  vN nU$  N! [        R                   a0    U R                  R                  SUTU5        [        SU ST S3US9ef = f7f)	a  
Execute an action on a dirty app.

The action runs in a thread pool executor to avoid blocking the
asyncio event loop. Execution timeout is enforced using
``dirty_timeout`` config.

Args:
    app_path: Import path of the dirty app
    action: Action name to execute
    args: Positional arguments
    kwargs: Keyword arguments

Returns:
    Result from the app action

Raises:
    DirtyAppNotFoundError: If app is not loaded
    DirtyTimeoutError: If execution exceeds timeout
    DirtyAppError: If execution fails
r   Nc                     > T" T /TQ70 TD6$ r    )r   rb   r   r   s   r$   <lambda>%DirtyWorker.execute.<locals>.<lambda>  s    C888r'   )timeoutz%Execution timeout for %s.%s after %dszExecution of .z
 timed out)r   r   r   r   rl   r   wait_forr   r"   TimeoutErrorr   warningr	   )	r#   r   r   r   r   r   r   r   rb   s	     ```   @r$   r   DirtyWorker.execute  s     , 99$'11ii!,0HH,B,BQ,F$((((D '')	"++$$NN8   F M ## 		HH7&' $z6(*= 		s1   A1D89B: 1B82B: 7D8B: :AC>>Dc                    U R                   (       a!  U R                   R                  SSS9  SU l         U R                  R                  5        H2  u  p UR	                  5         U R
                  R                  SU5        M4      U R                  R	                  5          [        R                  R                  U R                  5      (       a   [        R                  " U R                  5        U R
                  R                  SU R                   5        g! [         a'  nU R
                  R                  SX5         SnAM  SnAff = f! [         a     Nf = f! [         a     Nwf = f)zClean up resources on shutdown.FT)waitcancel_futuresNzClosed dirty app: %szError closing dirty app %s: %szDirty worker %s exiting)r"   shutdownr   r3   rX   r   r[   r^   r_   r   r4   ra   ru   r   rv   r]   r   r`   s       r$   rq   DirtyWorker._cleanup  s    >>NN##t#D!DN *IDJ		5t< +	HHNN
	ww~~d..//		$**+ 	/:!  J?IIJ  		  		s=   ,DE AE 
EE  E
EE
E%$E%)r"   r!   r    r   r   r   r   r   r   r   r   r   r   r   r   N)__name__
__module____qualname____firstlineno____doc__splitgetattrrG   rF   r%   r+   r.   rC   r<   rJ   rT   r=   r@   rp   r{   rx   r   r   r   r   rq   __static_attributes__).0xr   rG   s   0000r$   r   r   `   s     -22464 12wvw{+46G8+"H<"<!
",B>64Fl.`B2h;]6s   A7
r   )r   rl   r   r4   rG   r   r   gunicornr   gunicorn.workers.workertmpr   rb   r   errorsr   r   r	   r
   protocolr   r   r   r   r   r   r   r'   r$   <module>r      sH   
?B   	     0    r; r;r'   