JFIFXX    $.' ",#(7),01444'9=82<.342  2!!22222222222222222222222222222222222222222222222222"4 ,PG"Z_4˷kjزZ,F+_z,© zh6٨icfu#ډb_N?wQ5-~I8TK<5oIv-k_U_~bMdӜUHh?]EwQk{_}qFW7HTՑYF?_'ϔ_Ջt=||I 6έ"D/[k9Y8ds|\Ҿp6Ҵ].6znopM[mei$[soᘨ˸ nɜG-ĨUycP3.DBli;hjx7Z^NhN3u{:jx힞#M&jL P@_ P&o89@Sz6t7#Oߋ s}YfTlmrZ)'Nk۞pw\Tȯ?8`Oi{wﭹW[r Q4F׊3m&L=h3z~#\l :F,j@ ʱwQT8"kJO6֚l}R>ډK]y&p}b;N1mr$|7>e@BTM*-iHgD) Em|ؘbҗaҾt4oG*oCNrPQ@z,|?W[0:n,jWiEW$~/hp\?{(0+Y8rΟ+>S-SVN;}s?. w9˟<Mq4Wv'{)01mBVW[8/< %wT^5b)iM pgN&ݝVO~qu9 !J27$O-! :%H ـyΠM=t{!S oK8txA& j0 vF Y|y ~6@c1vOpIg4lODL Rcj_uX63?nkWyf;^*B @~a`Eu+6L.ü>}y}_O6͐:YrGXkGl^w~㒶syIu! W XN7BVO!X2wvGRfT#t/?%8^WaTGcLMI(J1~8?aT ]ASE(*E} 2#I/׍qz^t̔bYz4xt){ OH+(EA&NXTo"XC')}Jzp ~5}^+6wcQ|LpdH}(.|kc4^"Z?ȕ a<L!039C EuCFEwç ;n?*oB8bʝ'#RqfM}7]s2tcS{\icTx;\7KPʇ Z O-~c>"?PEO8@8GQgaՎ󁶠䧘_%#r>1zaebqcPѵn#L =׀t L7`VA{C:ge@w1 Xp3c3ġpM"'-@n4fGB3DJ8[JoߐgK)ƛ$ 83+ 6ʻ SkI*KZlT _`?KQKdB`s}>`*>,*@JdoF*弝O}ks]yߘc1GV<=776qPTtXԀ!9*44Tހ3XΛex46YD  BdemDa\_l,G/֌7Y](xTt^%GE4}bTڹ;Y)BQu>J/J ⮶.XԄjݳ+Ed r5_D1 o Bx΢#<W8R6@gM. drD>(otU@x=~v2 ӣdoBd3eO6㣷ݜ66YQz`S{\P~z m5{J/L1xO\ZFu>ck#&:`$ai>2ΔloF[hlEܺΠk:)` $[69kOw\|8}ބ:񶐕IA1/=2[,!.}gN#ub ~݊}34qdELc$"[qU硬g^%B zrpJru%v\h1Yne`ǥ:gpQM~^Xi `S:V29.PV?Bk AEvw%_9CQwKekPؠ\;Io d{ ߞoc1eP\ `E=@KIRYK2NPlLɀ)&eB+ь( JTx_?EZ }@ 6U뙢طzdWIn` D噥[uV"G&Ú2g}&m?ċ"Om# {ON"SXNeysQ@FnVgdX~nj]J58up~.`r\O,ư0oS _Ml4kv\JSdxSW<AeIX$Iw:Sy›R9Q[,5;@]%u@ *rolbI  +%m:͇ZVủθau,RW33 dJeTYE.Mϧ-oj3+yy^cVO9NV\nd1 !͕_)av;թMlWR1)ElP;yوÏu 3k5Pr6<⒲l!˞*u־n!l:UNW %Chx8vL'X@*)̮ˍ D-M+JUkvK+x8cY?Ԡ~3mo|u@[XeYC\Kpx8oCC&N~3-H MXsu<`~"WL$8ξ3a)|:@m\^`@ҷ)5p+6p%i)P Mngc#0AruzRL+xSS?ʮ}()#tmˇ!0}}y$6Lt;$ʳ{^6{v6ķܰgVcnn ~zx«,2u?cE+ȘH؎%Za)X>uWTzNyosFQƤ$*&LLXL)1" LeOɟ9=:tZcŽY?ӭVwv~,Yrۗ|yGaFC.+ v1fήJ]STBn5sW}y$~z'c 8  ,! pVNSNNqy8z˱A4*'2n<s^ǧ˭PJޮɏUGLJ*#i}K%,)[z21z ?Nin1?TIR#m-1lA`fT5+ܐcq՝ʐ,3f2Uեmab#ŠdQy>\)SLYw#.ʑf ,"+w~N'cO3FN<)j&,- љ֊_zSTǦw>?nU仆Ve0$CdrP m׈eXmVu L.bֹ [Դaզ*\y8Է:Ez\0KqC b̘cөQ=0YsNS.3.Oo:#v7[#߫ 5܎LEr49nCOWlG^0k%;YߝZǓ:S#|}y,/kLd TA(AI$+I3;Y*Z}|ӧOdv..#:nf>>ȶITX 8y"dR|)0=n46ⲑ+ra ~]R̲c?6(q;5% |uj~z8R=XIV=|{vGj\gcqz؋%Mߍ1y#@f^^>N#x#۹6Y~?dfPO{P4Vu1E1J *|%JN`eWuzk M6q t[ gGvWIGu_ft5j"Y:Tɐ*; e54q$C2d} _SL#mYpO.C;cHi#֩%+) ӍƲVSYźg |tj38r|V1#;.SQA[S#`n+$$I P\[@s(EDzP])8G#0B[ىXIIq<9~[Z멜Z⊔IWU&A>P~#dp]9 "cP Md?٥Ifتuk/F9c*9Ǎ:ØFzn*@|Iށ9N3{'['ͬҲ4#}!V Fu,,mTIkv C7vB6kT91*l '~ƞFlU'M ][ΩũJ_{iIn$L jOdxkza۪#EClx˘oVɞljr)/,߬hL#^Lф,íMƁe̩NBLiLq}(q6IçJ$WE$:=#(KBzђ xlx?>Պ+>W,Ly!_DŌlQ![ SJ1ƐY}b,+Loxɓ)=yoh@꥟/Iѭ=Py9 ۍYӘe+pJnϱ?V\SO%(t =?MR[Șd/ nlB7j !;ӥ/[-A>dNsLj ,ɪv=1c.SQO3UƀܽE̻9GϷD7(}Ävӌ\y_0[w <΍>a_[0+LF.޺f>oNTq;y\bՃyjH<|q-eɏ_?_9+PHp$[uxK wMwNی'$Y2=qKBP~Yul:[<F12O5=d]Ysw:ϮEj,_QXz`H1,#II dwrP˂@ZJVy$\y{}^~[:NߌUOdؾe${p>G3cĖlʌ ת[`ϱ-WdgIig2 }s ؤ(%#sS@~3XnRG~\jc3vӍLM[JBTs3}jNʖW;7ç?=XF=-=qߚ#='c7ڑWI(O+=:uxqe2zi+kuGR0&eniT^J~\jyp'dtGsO39* b#Ɋ p[BwsT>d4ۧsnvnU_~,vƜJ1s QIz)(lv8MU=;56Gs#KMP=LvyGd}VwWBF'à ?MHUg2 !p7Qjڴ=ju JnA suMeƆҔ!)'8Ϣٔޝ(Vpצ֖d=ICJǠ{qkԭ߸i@Ku|p=..*+xz[Aqġ#s2aƊRR)*HRsi~a &fMP-KL@ZXy'x{}Zm+:)) IJ-iu ܒH'L(7yGӜq j 6ߌg1go,kرtY?W,pefOQS!K۟cҒA|սj>=⬒˧L[ ߿2JaB~Ru:Q] 0H~]7ƼI(}cq 'ήETq?fabӥvr )o-Q_'ᴎoK;Vo%~OK *bf:-ťIR`B5!RB@ï u ̯e\_U_ gES3QTaxU<~c?*#]MW,[8Oax]1bC|踤Plw5V%){t<d50iXSUm:Z┵i"1^B-PhJ&)O*DcWvM)}Pܗ-q\mmζZ-l@}aE6F@&Sg@ݚM ȹ 4#p\HdYDoH"\..RBHz_/5˘6KhJRPmƶim3,#ccoqa)*PtRmk7xDE\Y閣_X<~)c[[BP6YqS0%_;Àv~| VS؇ 'O0F0\U-d@7SJ*z3nyPOm~P3|Yʉr#CSN@ ƮRN)r"C:: #qbY. 6[2K2uǦHYRQMV G$Q+.>nNHq^ qmMVD+-#*U̒ p욳u:IBmPV@Or[b= 1UE_NmyKbNOU}the`|6֮P>\2PVIDiPO;9rmAHGWS]J*_G+kP2KaZH'KxWMZ%OYDRc+o?qGhmdSoh\D|:WUAQc yTq~^H/#pCZTI1ӏT4"ČZ}`w#*,ʹ 0i課Om*da^gJ݅{le9uF#Tֲ̲ٞC"qߍ ոޑo#XZTp@ o8(jdxw],f`~|,s^f1t|m򸄭/ctr5s79Q4H1꠲BB@l9@C+wpxu£Yc9?`@#omHs2)=2.ljg9$YS%*LRY7Z,*=䷘$armoϰUW.|rufIGwtZwo~5 YյhO+=8fF)W7L9lM̘·Y֘YLf큹pRF99.A "wz=E\Z'a 2Ǚ#;'}G*l^"q+2FQ hjkŦ${ޮ-T٭cf|3#~RJt$b(R(rdx >U b&9,>%E\ Άe$'q't*אެb-|dSBOO$R+H)܎K1m`;J2Y~9Og8=vqD`K[F)k[1m޼cn]skz$@)!I x՝"v9=ZA=`Ɠi :E)`7vI}dYI_ o:obo 3Q&D&2= Ά;>hy.*ⅥSӬ+q&j|UƧ}J0WW< ۋS)jQRjƯrN)Gű4Ѷ(S)Ǣ8iW52No˓ ۍ%5brOnL;n\G=^UdI8$&h'+(cȁ߫klS^cƗjԌEꭔgFȒ@}O*;evWVYJ\]X'5ղkFb 6Ro՜mi Ni>J?lPmU}>_Z&KKqrIDՉ~q3fL:Se>E-G{L6pe,8QIhaXaUA'ʂs+טIjP-y8ۈZ?J$WP Rs]|l(ԓsƊio(S0Y 8T97.WiLc~dxcE|2!XKƘਫ਼$((6~|d9u+qd^389Y6L.I?iIq9)O/뚅OXXVZF[یgQLK1RҖr@v#XlFНyS87kF!AsM^rkpjPDyS$Nqnxҍ!Uf!ehi2m`YI9r6 TFC}/y^Η5d'9A-J>{_l+`A['յϛ#w:݅%X}&PStQ"-\縵/$ƗhXb*yBS;Wջ_mcvt?2}1;qSdd~u:2k52R~z+|HE!)Ǟl7`0<,2*Hl-x^'_TVgZA'j ^2ΪN7t?w x1fIzC-ȖK^q;-WDvT78Z hK(P:Q- 8nZ܃e貾<1YT<,"6{/ ?͟|1:#gW>$dJdB=jf[%rE^il:BxSּ1հ,=*7 fcG#q eh?27,!7x6nLC4x},GeǝtC.vS F43zz\;QYC,6~;RYS/6|25vTimlv& nRh^ejRLGf? ۉҬܦƩ|Ȱ>3!viʯ>vオX3e_1zKȗ\qHS,EW[㺨uch⍸O}a>q6n6N6qN ! 1AQaq0@"2BRb#Pr3C`Scst$4D%Td ?Na3mCwxAmqmm$4n淿t'C"wzU=D\R+wp+YT&պ@ƃ3ޯ?AﶂaŘ@-Q=9Dռѻ@MVP܅G5fY6# ?0UQ,IX(6ڵ[DIMNލc&υj\XR|,4 jThAe^db#$]wOӪ1y%LYm뭛CUƃߜ}Cy1XνmF8jI]HۺиE@Ii;r8ӭVFՇ| &?3|xBMuSGe=Ӕ#BE5GY!z_eqр/W>|-Ci߇t1ޯќdR3ug=0 5[?#͏qcfH{ ?u=??ǯ}ZzhmΔBFTWPxs}G93 )gGR<>r h$'nchPBjJҧH -N1N?~}-q!=_2hcMlvY%UE@|vM2.Y[|y"EïKZF,ɯ?,q?vM 80jx";9vk+ ֧ ȺU?%vcVmA6Qg^MA}3nl QRNl8kkn'(M7m9وq%ޟ*h$Zk"$9: ?U8Sl,,|ɒxH(ѷGn/Q4PG%Ա8N! &7;eKM749R/%lc>x;>C:th?aKXbheᜋ^$Iհ hr7%F$EFdt5+(M6tÜUU|zW=aTsTgdqPQb'm1{|YXNb P~F^F:k6"j! Ir`1&-$Bevk:y#ywI0x=D4tUPZHڠ底taP6b>xaQ# WeFŮNjpJ* mQN*I-*ȩFg3 5Vʊɮa5FO@{NX?H]31Ri_uѕ 0 F~:60p͈SqX#a5>`o&+<2D: ڝ$nP*)N|yEjF5ټeihyZ >kbHavh-#!Po=@k̆IEN@}Ll?jO߭ʞQ|A07xwt!xfI2?Z<ץTcUj]陎Ltl }5ϓ$,Omˊ;@OjEj(ا,LXLOЦ90O .anA7j4 W_ٓzWjcBy՗+EM)dNg6y1_xp$Lv:9"zpʙ$^JԼ*ϭo=xLj6Ju82AH3$ٕ@=Vv]'qEz;I˼)=ɯx /W(Vp$ mu񶤑OqˎTr㠚xsrGCbypG1ߠw e8$⿄/M{*}W]˷.CK\ުx/$WPwr |i&}{X >$-l?-zglΆ(FhvS*b߲ڡn,|)mrH[a3ר[13o_U3TC$(=)0kgP u^=4 WYCҸ:vQרXàtkm,t*^,}D* "(I9R>``[~Q]#afi6l86:,ssN6j"A4IuQ6E,GnHzSHOuk5$I4ؤQ9@CwpBGv[]uOv0I4\yQѸ~>Z8Taqޣ;za/SI:ܫ_|>=Z8:SUIJ"IY8%b8H:QO6;7ISJҌAά3>cE+&jf$eC+z;V rʺmyeaQf&6ND.:NTvm<- uǝ\MvZYNNT-A>jr!SnO 13Ns%3D@`ܟ 1^c< aɽ̲Xë#w|ycW=9I*H8p^(4՗karOcWtO\ƍR8'KIQ?5>[}yUײ -h=% qThG2)"ו3]!kB*pFDlA,eEiHfPs5H:Փ~H0DتDIhF3c2E9H5zԑʚiX=:mxghd(v׊9iSOd@0ڽ:p5h-t&Xqӕ,ie|7A2O%PEhtjY1wЃ!  ࢽMy7\a@ţJ 4ȻF@o̒?4wx)]P~u57X 9^ܩU;Iꭆ 5 eK27({|Y׎ V\"Z1 Z}(Ǝ"1S_vE30>p; ΝD%xW?W?vo^Vidr[/&>~`9Why;R ;;ɮT?r$g1KACcKl:'3 cﳯ*"t8~l)m+U,z`(>yJ?h>]vЍG*{`;y]IT ;cNUfo¾h/$|NS1S"HVT4uhǜ]v;5͠x'C\SBplh}N ABx%ޭl/Twʽ]D=Kžr㻠l4SO?=k M: cCa#ha)ѐxcsgPiG{+xQI= zԫ+ 8"kñj=|c yCF/*9жh{ ?4o kmQNx;Y4膚aw?6>e]Qr:g,i"ԩA*M7qB?ӕFhV25r[7 Y }LR}*sg+xr2U=*'WSZDW]WǞ<叓{$9Ou4y90-1'*D`c^o?(9uݐ'PI& fJݮ:wSjfP1F:X H9dԯ˝[_54 }*;@ܨ ðynT?ןd#4rGͨH1|-#MrS3G3).᧏3vz֑r$G"`j 1tx0<ƆWh6y6,œGagAyb)hDß_mü gG;evݝnQ C-*oyaMI><]obD":GA-\%LT8c)+y76oQ#*{(F⽕y=rW\p۩cA^e6KʐcVf5$'->ՉN"F"UQ@fGb~#&M=8טJNu9D[̤so~ G9TtW^g5y$bY'سǴ=U-2 #MCt(i lj@Q 5̣i*OsxKf}\M{EV{υƇ);HIfeLȣr2>WIȂ6ik 5YOxȺ>Yf5'|H+98pjn.OyjY~iw'l;s2Y:'lgꥴ)o#'SaaKZ m}`169n"xI *+ }FP"l45'ZgE8?[X7(.Q-*ތL@̲v.5[=t\+CNܛ,gSQnH}*FG16&:t4ُ"Ạ$b |#rsaT ]ӽDP7ո0y)e$ٕvIh'QEAm*HRI=: 4牢) %_iNݧl] NtGHL ɱg<1V,J~ٹ"KQ 9HS9?@kr;we݁]I!{ @G["`J:n]{cAEVʆ#U96j#Ym\qe4hB7Cdv\MNgmAyQL4uLjj9#44tl^}LnR!t±]rh6ٍ>yҏNfU  Fm@8}/ujb9he:AyծwGpΧh5l}3p468)Udc;Us/֔YX1O2uqs`hwgr~{ RmhN؎*q 42*th>#E#HvOq}6e\,Wk#Xb>p}դ3T5†6[@Py*n|'f֧>lư΂̺SU'*qp_SM 'c6m ySʨ;MrƋmKxo,GmPAG:iw9}M(^V$ǒѽ9| aJSQarB;}ٻ֢2%Uc#gNaݕ'v[OY'3L3;,p]@S{lsX'cjwk'a.}}& dP*bK=ɍ!;3ngΊUߴmt'*{,=SzfD Ako~Gaoq_mi}#mPXhύmxǍ΂巿zfQc|kc?WY$_Lvl߶c`?ljݲˏ!V6UЂ(A4y)HpZ_x>eR$/`^'3qˏ-&Q=?CFVR DfV9{8gnh(P"6[D< E~0<@`G6Hгcc cK.5DdB`?XQ2ٿyqo&+1^ DW0ꊩG#QnL3c/x 11[yxპCWCcUĨ80me4.{muI=f0QRls9f9~fǨa"@8ȁQ#cicG$Gr/$W(WV"m7[mAmboD j۳ l^kh׽ # iXnveTka^Y4BNĕ0 !01@Q"2AaPq3BR?@4QT3,㺠W[=JKϞ2r^7vc:9 EߴwS#dIxu:Hp9E! V 2;73|F9Y*ʬFDu&y؟^EAA(ɩ^GV:ݜDy`Jr29ܾ㝉[E;FzxYGUeYC v-txIsםĘqEb+P\ :>iC';k|zرny]#ǿbQw(r|ӹs[D2v-%@;8<a[\o[ϧwI!*0krs)[J9^ʜp1) "/_>o<1AEy^C`x1'ܣnps`lfQ):lb>MejH^?kl3(z:1ŠK&?Q~{ٺhy/[V|6}KbXmn[-75q94dmc^h X5G-}دBޟ |rtMV+]c?-#ڛ^ǂ}LkrOu>-Dry D?:ޞUǜ7V?瓮"#rչģVR;n/_ ؉vݶe5db9/O009G5nWJpA*r9>1.[tsFnQ V 77R]ɫ8_0<՜IFu(v4Fk3E)N:yڮeP`1}$WSJSQNjٺ޵#lј(5=5lǏmoWv-1v,Wmn߀$x_DȬ0¤#QR[Vkzmw"9ZG7'[=Qj8R?zf\a=OU*oBA|G254 p.w7  &ξxGHp B%$gtЏ򤵍zHNuЯ-'40;_3 !01"@AQa2Pq#3BR?ʩcaen^8F<7;EA{EÖ1U/#d1an.1ě0ʾRh|RAo3m3 % 28Q yφHTo7lW>#i`qca m,B-j݋'mR1Ήt>Vps0IbIC.1Rea]H64B>o]($Bma!=?B KǾ+Ծ"nK*+[T#{EJSQs5:U\wĐf3܆&)IԆwE TlrTf6Q|Rh:[K zc֧GC%\_a84HcObiؖV7H )*ģK~Xhչ04?0 E<}3#u? |gS6ꊤ|I#Hڛ աwX97Ŀ%SLy6č|Fa 8b$sקhb9RAu7˨pČ_\*w묦F 4D~f|("mNKiS>$d7SlA/²SL|6N}S˯g]6; #. 403WebShell
403Webshell
Server IP : 13.127.148.211  /  Your IP : 216.73.216.149
Web Server : Apache/2.4.41 (Ubuntu)
System : Linux ip-172-31-43-195 5.15.0-1084-aws #91~20.04.1-Ubuntu SMP Fri May 2 06:59:36 UTC 2025 x86_64
User : www-data ( 33)
PHP Version : 7.4.3-4ubuntu2.29
Disable Function : pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,pcntl_unshare,
MySQL : OFF  |  cURL : ON  |  WGET : ON  |  Perl : ON  |  Python : OFF  |  Sudo : ON  |  Pkexec : ON
Directory :  /usr/lib/python3/dist-packages/twisted/runner/test/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Command :


[ Back ]     

Current File : /usr/lib/python3/dist-packages/twisted/runner/test/test_procmon.py
# Copyright (c) Twisted Matrix Laboratories.
# See LICENSE for details.

"""
Tests for L{twisted.runner.procmon}.
"""
import pickle

from twisted.trial import unittest
from twisted.runner.procmon import LoggingProtocol, ProcessMonitor
from twisted.internet.error import (ProcessDone, ProcessTerminated,
                                    ProcessExitedAlready)
from twisted.internet.task import Clock
from twisted.python.failure import Failure
from twisted.python import log
from twisted.test.proto_helpers import MemoryReactor



class DummyProcess(object):
    """
    An incomplete and fake L{IProcessTransport} implementation for testing how
    L{ProcessMonitor} behaves when its monitored processes exit.

    @ivar _terminationDelay: the delay in seconds after which the DummyProcess
        will appear to exit when it receives a TERM signal
    """

    pid = 1
    proto = None

    _terminationDelay = 1

    def __init__(self, reactor, executable, args, environment, path,
                 proto, uid=None, gid=None, usePTY=0, childFDs=None):

        self.proto = proto

        self._reactor = reactor
        self._executable = executable
        self._args = args
        self._environment = environment
        self._path = path
        self._uid = uid
        self._gid = gid
        self._usePTY = usePTY
        self._childFDs = childFDs


    def signalProcess(self, signalID):
        """
        A partial implementation of signalProcess which can only handle TERM and
        KILL signals.
         - When a TERM signal is given, the dummy process will appear to exit
           after L{DummyProcess._terminationDelay} seconds with exit code 0
         - When a KILL signal is given, the dummy process will appear to exit
           immediately with exit code 1.

        @param signalID: The signal name or number to be issued to the process.
        @type signalID: C{str}
        """
        params = {
            "TERM": (self._terminationDelay, 0),
            "KILL": (0, 1)
        }

        if self.pid is None:
            raise ProcessExitedAlready()

        if signalID in params:
            delay, status = params[signalID]
            self._signalHandler = self._reactor.callLater(
                delay, self.processEnded, status)


    def processEnded(self, status):
        """
        Deliver the process ended event to C{self.proto}.
        """
        self.pid = None
        statusMap = {
            0: ProcessDone,
            1: ProcessTerminated,
        }
        self.proto.processEnded(Failure(statusMap[status](status)))



class DummyProcessReactor(MemoryReactor, Clock):
    """
    @ivar spawnedProcesses: a list that keeps track of the fake process
        instances built by C{spawnProcess}.
    @type spawnedProcesses: C{list}
    """
    def __init__(self):
        MemoryReactor.__init__(self)
        Clock.__init__(self)

        self.spawnedProcesses = []


    def spawnProcess(self, processProtocol, executable, args=(), env={},
                     path=None, uid=None, gid=None, usePTY=0,
                     childFDs=None):
        """
        Fake L{reactor.spawnProcess}, that logs all the process
        arguments and returns a L{DummyProcess}.
        """

        proc = DummyProcess(self, executable, args, env, path,
                            processProtocol, uid, gid, usePTY, childFDs)
        processProtocol.makeConnection(proc)
        self.spawnedProcesses.append(proc)
        return proc



class ProcmonTests(unittest.TestCase):
    """
    Tests for L{ProcessMonitor}.
    """

    def setUp(self):
        """
        Create an L{ProcessMonitor} wrapped around a fake reactor.
        """
        self.reactor = DummyProcessReactor()
        self.pm = ProcessMonitor(reactor=self.reactor)
        self.pm.minRestartDelay = 2
        self.pm.maxRestartDelay = 10
        self.pm.threshold = 10


    def test_reprLooksGood(self):
        """
        Repr includes all details
        """
        self.pm.addProcess("foo", ["arg1", "arg2"],
                           uid=1, gid=2, env={})
        representation = repr(self.pm)
        self.assertIn('foo', representation)
        self.assertIn('1', representation)
        self.assertIn('2', representation)


    def test_simpleReprLooksGood(self):
        """
        Repr does not include unneeded details.

        Values of attributes that just mean "inherit from launching
        process" do not appear in the repr of a process.
        """
        self.pm.addProcess("foo", ["arg1", "arg2"], env={})
        representation = repr(self.pm)
        self.assertNotIn('(', representation)
        self.assertNotIn(')', representation)


    def test_getStateIncludesProcesses(self):
        """
        The list of monitored processes must be included in the pickle state.
        """
        self.pm.addProcess("foo", ["arg1", "arg2"],
                           uid=1, gid=2, env={})
        self.assertEqual(self.pm.__getstate__()['processes'],
                          {'foo': (['arg1', 'arg2'], 1, 2, {})})


    def test_getStateExcludesReactor(self):
        """
        The private L{ProcessMonitor._reactor} instance variable should not be
        included in the pickle state.
        """
        self.assertNotIn('_reactor', self.pm.__getstate__())


    def test_addProcess(self):
        """
        L{ProcessMonitor.addProcess} only starts the named program if
        L{ProcessMonitor.startService} has been called.
        """
        self.pm.addProcess("foo", ["arg1", "arg2"],
                           uid=1, gid=2, env={})
        self.assertEqual(self.pm.protocols, {})
        self.assertEqual(self.pm.processes,
                          {"foo": (["arg1", "arg2"], 1, 2, {})})
        self.pm.startService()
        self.reactor.advance(0)
        self.assertEqual(list(self.pm.protocols.keys()), ["foo"])


    def test_addProcessDuplicateKeyError(self):
        """
        L{ProcessMonitor.addProcess} raises a C{KeyError} if a process with the
        given name already exists.
        """
        self.pm.addProcess("foo", ["arg1", "arg2"],
                           uid=1, gid=2, env={})
        self.assertRaises(KeyError, self.pm.addProcess,
                          "foo", ["arg1", "arg2"], uid=1, gid=2, env={})


    def test_addProcessEnv(self):
        """
        L{ProcessMonitor.addProcess} takes an C{env} parameter that is passed to
        L{IReactorProcess.spawnProcess}.
        """
        fakeEnv = {"KEY": "value"}
        self.pm.startService()
        self.pm.addProcess("foo", ["foo"], uid=1, gid=2, env=fakeEnv)
        self.reactor.advance(0)
        self.assertEqual(
            self.reactor.spawnedProcesses[0]._environment, fakeEnv)


    def test_addProcessCwd(self):
        """
        L{ProcessMonitor.addProcess} takes an C{cwd} parameter that is passed
        to L{IReactorProcess.spawnProcess}.
        """
        self.pm.startService()
        self.pm.addProcess("foo", ["foo"], cwd='/mnt/lala')
        self.reactor.advance(0)
        self.assertEqual(
            self.reactor.spawnedProcesses[0]._path, '/mnt/lala')


    def test_removeProcess(self):
        """
        L{ProcessMonitor.removeProcess} removes the process from the public
        processes list.
        """
        self.pm.startService()
        self.pm.addProcess("foo", ["foo"])
        self.assertEqual(len(self.pm.processes), 1)
        self.pm.removeProcess("foo")
        self.assertEqual(len(self.pm.processes), 0)


    def test_removeProcessUnknownKeyError(self):
        """
        L{ProcessMonitor.removeProcess} raises a C{KeyError} if the given
        process name isn't recognised.
        """
        self.pm.startService()
        self.assertRaises(KeyError, self.pm.removeProcess, "foo")


    def test_startProcess(self):
        """
        When a process has been started, an instance of L{LoggingProtocol} will
        be added to the L{ProcessMonitor.protocols} dict and the start time of
        the process will be recorded in the L{ProcessMonitor.timeStarted}
        dictionary.
        """
        self.pm.addProcess("foo", ["foo"])
        self.pm.startProcess("foo")
        self.assertIsInstance(self.pm.protocols["foo"], LoggingProtocol)
        self.assertIn("foo", self.pm.timeStarted.keys())


    def test_startProcessAlreadyStarted(self):
        """
        L{ProcessMonitor.startProcess} silently returns if the named process is
        already started.
        """
        self.pm.addProcess("foo", ["foo"])
        self.pm.startProcess("foo")
        self.assertIsNone(self.pm.startProcess("foo"))


    def test_startProcessUnknownKeyError(self):
        """
        L{ProcessMonitor.startProcess} raises a C{KeyError} if the given
        process name isn't recognised.
        """
        self.assertRaises(KeyError, self.pm.startProcess, "foo")


    def test_stopProcessNaturalTermination(self):
        """
        L{ProcessMonitor.stopProcess} immediately sends a TERM signal to the
        named process.
        """
        self.pm.startService()
        self.pm.addProcess("foo", ["foo"])
        self.assertIn("foo", self.pm.protocols)

        # Configure fake process to die 1 second after receiving term signal
        timeToDie = self.pm.protocols["foo"].transport._terminationDelay = 1

        # Advance the reactor to just before the short lived process threshold
        # and leave enough time for the process to die
        self.reactor.advance(self.pm.threshold)
        # Then signal the process to stop
        self.pm.stopProcess("foo")

        # Advance the reactor just enough to give the process time to die and
        # verify that the process restarts
        self.reactor.advance(timeToDie)

        # We expect it to be restarted immediately
        self.assertEqual(self.reactor.seconds(),
                         self.pm.timeStarted["foo"])


    def test_stopProcessForcedKill(self):
        """
        L{ProcessMonitor.stopProcess} kills a process which fails to terminate
        naturally within L{ProcessMonitor.killTime} seconds.
        """
        self.pm.startService()
        self.pm.addProcess("foo", ["foo"])
        self.assertIn("foo", self.pm.protocols)
        self.reactor.advance(self.pm.threshold)
        proc = self.pm.protocols["foo"].transport
        # Arrange for the fake process to live longer than the killTime
        proc._terminationDelay = self.pm.killTime + 1
        self.pm.stopProcess("foo")
        # If process doesn't die before the killTime, procmon should
        # terminate it
        self.reactor.advance(self.pm.killTime - 1)
        self.assertEqual(0.0, self.pm.timeStarted["foo"])

        self.reactor.advance(1)
        # We expect it to be immediately restarted
        self.assertEqual(self.reactor.seconds(), self.pm.timeStarted["foo"])


    def test_stopProcessUnknownKeyError(self):
        """
        L{ProcessMonitor.stopProcess} raises a C{KeyError} if the given process
        name isn't recognised.
        """
        self.assertRaises(KeyError, self.pm.stopProcess, "foo")


    def test_stopProcessAlreadyStopped(self):
        """
        L{ProcessMonitor.stopProcess} silently returns if the named process
        is already stopped. eg Process has crashed and a restart has been
        rescheduled, but in the meantime, the service is stopped.
        """
        self.pm.addProcess("foo", ["foo"])
        self.assertIsNone(self.pm.stopProcess("foo"))


    def test_outputReceivedCompleteLine(self):
        """
        Getting a complete output line generates a log message.
        """
        events = []
        self.addCleanup(log.removeObserver, events.append)
        log.addObserver(events.append)
        self.pm.addProcess("foo", ["foo"])
        # Schedule the process to start
        self.pm.startService()
        # Advance the reactor to start the process
        self.reactor.advance(0)
        self.assertIn("foo", self.pm.protocols)
        # Long time passes
        self.reactor.advance(self.pm.threshold)
        # Process greets
        self.pm.protocols["foo"].outReceived(b'hello world!\n')
        self.assertEquals(len(events), 1)
        message = events[0]['message']
        self.assertEquals(message, tuple([u'[foo] hello world!']))


    def test_outputReceivedCompleteLineInvalidUTF8(self):
        """
        Getting invalid UTF-8 results in the repr of the raw message
        """
        events = []
        self.addCleanup(log.removeObserver, events.append)
        log.addObserver(events.append)
        self.pm.addProcess("foo", ["foo"])
        # Schedule the process to start
        self.pm.startService()
        # Advance the reactor to start the process
        self.reactor.advance(0)
        self.assertIn("foo", self.pm.protocols)
        # Long time passes
        self.reactor.advance(self.pm.threshold)
        # Process greets
        self.pm.protocols["foo"].outReceived(b'\xffhello world!\n')
        self.assertEquals(len(events), 1)
        messages = events[0]['message']
        self.assertEquals(len(messages), 1)
        message = messages[0]
        tag, output = message.split(' ', 1)
        self.assertEquals(tag, '[foo]')
        self.assertEquals(output, repr(b'\xffhello world!'))


    def test_outputReceivedPartialLine(self):
        """
        Getting partial line results in no events until process end
        """
        events = []
        self.addCleanup(log.removeObserver, events.append)
        log.addObserver(events.append)
        self.pm.addProcess("foo", ["foo"])
        # Schedule the process to start
        self.pm.startService()
        # Advance the reactor to start the process
        self.reactor.advance(0)
        self.assertIn("foo", self.pm.protocols)
        # Long time passes
        self.reactor.advance(self.pm.threshold)
        # Process greets
        self.pm.protocols["foo"].outReceived(b'hello world!')
        self.assertEquals(len(events), 0)
        self.pm.protocols["foo"].processEnded(Failure(ProcessDone(0)))
        self.assertEquals(len(events), 1)
        message = events[0]['message']
        self.assertEquals(message, tuple([u'[foo] hello world!']))

    def test_connectionLostLongLivedProcess(self):
        """
        L{ProcessMonitor.connectionLost} should immediately restart a process
        if it has been running longer than L{ProcessMonitor.threshold} seconds.
        """
        self.pm.addProcess("foo", ["foo"])
        # Schedule the process to start
        self.pm.startService()
        # advance the reactor to start the process
        self.reactor.advance(0)
        self.assertIn("foo", self.pm.protocols)
        # Long time passes
        self.reactor.advance(self.pm.threshold)
        # Process dies after threshold
        self.pm.protocols["foo"].processEnded(Failure(ProcessDone(0)))
        self.assertNotIn("foo", self.pm.protocols)
        # Process should be restarted immediately
        self.reactor.advance(0)
        self.assertIn("foo", self.pm.protocols)


    def test_connectionLostMurderCancel(self):
        """
        L{ProcessMonitor.connectionLost} cancels a scheduled process killer and
        deletes the DelayedCall from the L{ProcessMonitor.murder} list.
        """
        self.pm.addProcess("foo", ["foo"])
        # Schedule the process to start
        self.pm.startService()
        # Advance 1s to start the process then ask ProcMon to stop it
        self.reactor.advance(1)
        self.pm.stopProcess("foo")
        # A process killer has been scheduled, delayedCall is active
        self.assertIn("foo", self.pm.murder)
        delayedCall = self.pm.murder["foo"]
        self.assertTrue(delayedCall.active())
        # Advance to the point at which the dummy process exits
        self.reactor.advance(
            self.pm.protocols["foo"].transport._terminationDelay)
        # Now the delayedCall has been cancelled and deleted
        self.assertFalse(delayedCall.active())
        self.assertNotIn("foo", self.pm.murder)


    def test_connectionLostProtocolDeletion(self):
        """
        L{ProcessMonitor.connectionLost} removes the corresponding
        ProcessProtocol instance from the L{ProcessMonitor.protocols} list.
        """
        self.pm.startService()
        self.pm.addProcess("foo", ["foo"])
        self.assertIn("foo", self.pm.protocols)
        self.pm.protocols["foo"].transport.signalProcess("KILL")
        self.reactor.advance(
            self.pm.protocols["foo"].transport._terminationDelay)
        self.assertNotIn("foo", self.pm.protocols)


    def test_connectionLostMinMaxRestartDelay(self):
        """
        L{ProcessMonitor.connectionLost} will wait at least minRestartDelay s
        and at most maxRestartDelay s
        """
        self.pm.minRestartDelay = 2
        self.pm.maxRestartDelay = 3

        self.pm.startService()
        self.pm.addProcess("foo", ["foo"])

        self.assertEqual(self.pm.delay["foo"], self.pm.minRestartDelay)
        self.reactor.advance(self.pm.threshold - 1)
        self.pm.protocols["foo"].processEnded(Failure(ProcessDone(0)))
        self.assertEqual(self.pm.delay["foo"], self.pm.maxRestartDelay)


    def test_connectionLostBackoffDelayDoubles(self):
        """
        L{ProcessMonitor.connectionLost} doubles the restart delay each time
        the process dies too quickly.
        """
        self.pm.startService()
        self.pm.addProcess("foo", ["foo"])
        self.reactor.advance(self.pm.threshold - 1) #9s
        self.assertIn("foo", self.pm.protocols)
        self.assertEqual(self.pm.delay["foo"], self.pm.minRestartDelay)
        # process dies within the threshold and should not restart immediately
        self.pm.protocols["foo"].processEnded(Failure(ProcessDone(0)))
        self.assertEqual(self.pm.delay["foo"], self.pm.minRestartDelay * 2)


    def test_startService(self):
        """
        L{ProcessMonitor.startService} starts all monitored processes.
        """
        self.pm.addProcess("foo", ["foo"])
        # Schedule the process to start
        self.pm.startService()
        # advance the reactor to start the process
        self.reactor.advance(0)
        self.assertIn("foo", self.pm.protocols)


    def test_stopService(self):
        """
        L{ProcessMonitor.stopService} should stop all monitored processes.
        """
        self.pm.addProcess("foo", ["foo"])
        self.pm.addProcess("bar", ["bar"])
        # Schedule the process to start
        self.pm.startService()
        # advance the reactor to start the processes
        self.reactor.advance(self.pm.threshold)
        self.assertIn("foo", self.pm.protocols)
        self.assertIn("bar", self.pm.protocols)

        self.reactor.advance(1)

        self.pm.stopService()
        # Advance to beyond the killTime - all monitored processes
        # should have exited
        self.reactor.advance(self.pm.killTime + 1)
        # The processes shouldn't be restarted
        self.assertEqual({}, self.pm.protocols)


    def test_restartAllRestartsOneProcess(self):
        """
        L{ProcessMonitor.restartAll} succeeds when there is one process.
        """
        self.pm.addProcess("foo", ["foo"])
        self.pm.startService()
        self.reactor.advance(1)
        self.pm.restartAll()
        # Just enough time for the process to die,
        # not enough time to start a new one.
        self.reactor.advance(1)
        processes = list(self.reactor.spawnedProcesses)
        myProcess = processes.pop()
        self.assertEquals(processes, [])
        self.assertIsNone(myProcess.pid)

    def test_stopServiceCancelRestarts(self):
        """
        L{ProcessMonitor.stopService} should cancel any scheduled process
        restarts.
        """
        self.pm.addProcess("foo", ["foo"])
        # Schedule the process to start
        self.pm.startService()
        # advance the reactor to start the processes
        self.reactor.advance(self.pm.threshold)
        self.assertIn("foo", self.pm.protocols)

        self.reactor.advance(1)
        # Kill the process early
        self.pm.protocols["foo"].processEnded(Failure(ProcessDone(0)))
        self.assertTrue(self.pm.restart['foo'].active())
        self.pm.stopService()
        # Scheduled restart should have been cancelled
        self.assertFalse(self.pm.restart['foo'].active())


    def test_stopServiceCleanupScheduledRestarts(self):
        """
        L{ProcessMonitor.stopService} should cancel all scheduled process
        restarts.
        """
        self.pm.threshold = 5
        self.pm.minRestartDelay = 5
        # Start service and add a process (started immediately)
        self.pm.startService()
        self.pm.addProcess("foo", ["foo"])
        # Stop the process after 1s
        self.reactor.advance(1)
        self.pm.stopProcess("foo")
        # Wait 1s for it to exit it will be scheduled to restart 5s later
        self.reactor.advance(1)
        # Meanwhile stop the service
        self.pm.stopService()
        # Advance to beyond the process restart time
        self.reactor.advance(6)
        # The process shouldn't have restarted because stopService has cancelled
        # all pending process restarts.
        self.assertEqual(self.pm.protocols, {})



class DeprecationTests(unittest.SynchronousTestCase):

    """
    Tests that check functionality that should be deprecated is deprecated.
    """

    def setUp(self):
        """
        Create reactor and process monitor.
        """
        self.reactor = DummyProcessReactor()
        self.pm = ProcessMonitor(reactor=self.reactor)


    def test_toTuple(self):
        """
        _Process.toTuple is deprecated.

        When getting the deprecated processes property, the actual
        data (kept in the class _Process) is converted to a tuple --
        which produces a DeprecationWarning per process so converted.
        """
        self.pm.addProcess("foo", ["foo"])
        myprocesses = self.pm.processes
        self.assertEquals(len(myprocesses), 1)
        warnings = self.flushWarnings()
        foundToTuple = False
        for warning in warnings:
            self.assertIs(warning['category'], DeprecationWarning)
            if 'toTuple' in warning['message']:
                foundToTuple = True
        self.assertTrue(foundToTuple,
                        "no tuple deprecation found:{}".format(repr(warnings)))


    def test_processes(self):
        """
        Accessing L{ProcessMonitor.processes} results in deprecation warning

        Even when there are no processes, and thus no process is converted
        to a tuple, accessing the L{ProcessMonitor.processes} property
        should generate its own DeprecationWarning.
        """
        myProcesses = self.pm.processes
        self.assertEquals(myProcesses, {})
        warnings = self.flushWarnings()
        first = warnings.pop(0)
        self.assertIs(first['category'], DeprecationWarning)
        self.assertEquals(warnings, [])


    def test_getstate(self):
        """
        Pickling an L{ProcessMonitor} results in deprecation warnings
        """
        pickle.dumps(self.pm)
        warnings = self.flushWarnings()
        for warning in warnings:
            self.assertIs(warning['category'], DeprecationWarning)

Youez - 2016 - github.com/yon3zu
LinuXploit