JFIFXX    $.' ",#(7),01444'9=82<.342  2!!22222222222222222222222222222222222222222222222222"4 ,PG"Z_4˷kjزZ,F+_z,© zh6٨icfu#ډb_N?wQ5-~I8TK<5oIv-k_U_~bMdӜUHh?]EwQk{_}qFW7HTՑYF?_'ϔ_Ջt=||I 6έ"D/[k9Y8ds|\Ҿp6Ҵ].6znopM[mei$[soᘨ˸ nɜG-ĨUycP3.DBli;hjx7Z^NhN3u{:jx힞#M&jL P@_ P&o89@Sz6t7#Oߋ s}YfTlmrZ)'Nk۞pw\Tȯ?8`Oi{wﭹW[r Q4F׊3m&L=h3z~#\l :F,j@ ʱwQT8"kJO6֚l}R>ډK]y&p}b;N1mr$|7>e@BTM*-iHgD) Em|ؘbҗaҾt4oG*oCNrPQ@z,|?W[0:n,jWiEW$~/hp\?{(0+Y8rΟ+>S-SVN;}s?. w9˟<Mq4Wv'{)01mBVW[8/< %wT^5b)iM pgN&ݝVO~qu9 !J27$O-! :%H ـyΠM=t{!S oK8txA& j0 vF Y|y ~6@c1vOpIg4lODL Rcj_uX63?nkWyf;^*B @~a`Eu+6L.ü>}y}_O6͐:YrGXkGl^w~㒶syIu! W XN7BVO!X2wvGRfT#t/?%8^WaTGcLMI(J1~8?aT ]ASE(*E} 2#I/׍qz^t̔bYz4xt){ OH+(EA&NXTo"XC')}Jzp ~5}^+6wcQ|LpdH}(.|kc4^"Z?ȕ a<L!039C EuCFEwç ;n?*oB8bʝ'#RqfM}7]s2tcS{\icTx;\7KPʇ Z O-~c>"?PEO8@8GQgaՎ󁶠䧘_%#r>1zaebqcPѵn#L =׀t L7`VA{C:ge@w1 Xp3c3ġpM"'-@n4fGB3DJ8[JoߐgK)ƛ$ 83+ 6ʻ SkI*KZlT _`?KQKdB`s}>`*>,*@JdoF*弝O}ks]yߘc1GV<=776qPTtXԀ!9*44Tހ3XΛex46YD  BdemDa\_l,G/֌7Y](xTt^%GE4}bTڹ;Y)BQu>J/J ⮶.XԄjݳ+Ed r5_D1 o Bx΢#<W8R6@gM. drD>(otU@x=~v2 ӣdoBd3eO6㣷ݜ66YQz`S{\P~z m5{J/L1xO\ZFu>ck#&:`$ai>2ΔloF[hlEܺΠk:)` $[69kOw\|8}ބ:񶐕IA1/=2[,!.}gN#ub ~݊}34qdELc$"[qU硬g^%B zrpJru%v\h1Yne`ǥ:gpQM~^Xi `S:V29.PV?Bk AEvw%_9CQwKekPؠ\;Io d{ ߞoc1eP\ `E=@KIRYK2NPlLɀ)&eB+ь( JTx_?EZ }@ 6U뙢طzdWIn` D噥[uV"G&Ú2g}&m?ċ"Om# {ON"SXNeysQ@FnVgdX~nj]J58up~.`r\O,ư0oS _Ml4kv\JSdxSW<AeIX$Iw:Sy›R9Q[,5;@]%u@ *rolbI  +%m:͇ZVủθau,RW33 dJeTYE.Mϧ-oj3+yy^cVO9NV\nd1 !͕_)av;թMlWR1)ElP;yوÏu 3k5Pr6<⒲l!˞*u־n!l:UNW %Chx8vL'X@*)̮ˍ D-M+JUkvK+x8cY?Ԡ~3mo|u@[XeYC\Kpx8oCC&N~3-H MXsu<`~"WL$8ξ3a)|:@m\^`@ҷ)5p+6p%i)P Mngc#0AruzRL+xSS?ʮ}()#tmˇ!0}}y$6Lt;$ʳ{^6{v6ķܰgVcnn ~zx«,2u?cE+ȘH؎%Za)X>uWTzNyosFQƤ$*&LLXL)1" LeOɟ9=:tZcŽY?ӭVwv~,Yrۗ|yGaFC.+ v1fήJ]STBn5sW}y$~z'c 8  ,! pVNSNNqy8z˱A4*'2n<s^ǧ˭PJޮɏUGLJ*#i}K%,)[z21z ?Nin1?TIR#m-1lA`fT5+ܐcq՝ʐ,3f2Uեmab#ŠdQy>\)SLYw#.ʑf ,"+w~N'cO3FN<)j&,- љ֊_zSTǦw>?nU仆Ve0$CdrP m׈eXmVu L.bֹ [Դaզ*\y8Է:Ez\0KqC b̘cөQ=0YsNS.3.Oo:#v7[#߫ 5܎LEr49nCOWlG^0k%;YߝZǓ:S#|}y,/kLd TA(AI$+I3;Y*Z}|ӧOdv..#:nf>>ȶITX 8y"dR|)0=n46ⲑ+ra ~]R̲c?6(q;5% |uj~z8R=XIV=|{vGj\gcqz؋%Mߍ1y#@f^^>N#x#۹6Y~?dfPO{P4Vu1E1J *|%JN`eWuzk M6q t[ gGvWIGu_ft5j"Y:Tɐ*; e54q$C2d} _SL#mYpO.C;cHi#֩%+) ӍƲVSYźg |tj38r|V1#;.SQA[S#`n+$$I P\[@s(EDzP])8G#0B[ىXIIq<9~[Z멜Z⊔IWU&A>P~#dp]9 "cP Md?٥Ifتuk/F9c*9Ǎ:ØFzn*@|Iށ9N3{'['ͬҲ4#}!V Fu,,mTIkv C7vB6kT91*l '~ƞFlU'M ][ΩũJ_{iIn$L jOdxkza۪#EClx˘oVɞljr)/,߬hL#^Lф,íMƁe̩NBLiLq}(q6IçJ$WE$:=#(KBzђ xlx?>Պ+>W,Ly!_DŌlQ![ SJ1ƐY}b,+Loxɓ)=yoh@꥟/Iѭ=Py9 ۍYӘe+pJnϱ?V\SO%(t =?MR[Șd/ nlB7j !;ӥ/[-A>dNsLj ,ɪv=1c.SQO3UƀܽE̻9GϷD7(}Ävӌ\y_0[w <΍>a_[0+LF.޺f>oNTq;y\bՃyjH<|q-eɏ_?_9+PHp$[uxK wMwNی'$Y2=qKBP~Yul:[<F12O5=d]Ysw:ϮEj,_QXz`H1,#II dwrP˂@ZJVy$\y{}^~[:NߌUOdؾe${p>G3cĖlʌ ת[`ϱ-WdgIig2 }s ؤ(%#sS@~3XnRG~\jc3vӍLM[JBTs3}jNʖW;7ç?=XF=-=qߚ#='c7ڑWI(O+=:uxqe2zi+kuGR0&eniT^J~\jyp'dtGsO39* b#Ɋ p[BwsT>d4ۧsnvnU_~,vƜJ1s QIz)(lv8MU=;56Gs#KMP=LvyGd}VwWBF'à ?MHUg2 !p7Qjڴ=ju JnA suMeƆҔ!)'8Ϣٔޝ(Vpצ֖d=ICJǠ{qkԭ߸i@Ku|p=..*+xz[Aqġ#s2aƊRR)*HRsi~a &fMP-KL@ZXy'x{}Zm+:)) IJ-iu ܒH'L(7yGӜq j 6ߌg1go,kرtY?W,pefOQS!K۟cҒA|սj>=⬒˧L[ ߿2JaB~Ru:Q] 0H~]7ƼI(}cq 'ήETq?fabӥvr )o-Q_'ᴎoK;Vo%~OK *bf:-ťIR`B5!RB@ï u ̯e\_U_ gES3QTaxU<~c?*#]MW,[8Oax]1bC|踤Plw5V%){t<d50iXSUm:Z┵i"1^B-PhJ&)O*DcWvM)}Pܗ-q\mmζZ-l@}aE6F@&Sg@ݚM ȹ 4#p\HdYDoH"\..RBHz_/5˘6KhJRPmƶim3,#ccoqa)*PtRmk7xDE\Y閣_X<~)c[[BP6YqS0%_;Àv~| VS؇ 'O0F0\U-d@7SJ*z3nyPOm~P3|Yʉr#CSN@ ƮRN)r"C:: #qbY. 6[2K2uǦHYRQMV G$Q+.>nNHq^ qmMVD+-#*U̒ p욳u:IBmPV@Or[b= 1UE_NmyKbNOU}the`|6֮P>\2PVIDiPO;9rmAHGWS]J*_G+kP2KaZH'KxWMZ%OYDRc+o?qGhmdSoh\D|:WUAQc yTq~^H/#pCZTI1ӏT4"ČZ}`w#*,ʹ 0i課Om*da^gJ݅{le9uF#Tֲ̲ٞC"qߍ ոޑo#XZTp@ o8(jdxw],f`~|,s^f1t|m򸄭/ctr5s79Q4H1꠲BB@l9@C+wpxu£Yc9?`@#omHs2)=2.ljg9$YS%*LRY7Z,*=䷘$armoϰUW.|rufIGwtZwo~5 YյhO+=8fF)W7L9lM̘·Y֘YLf큹pRF99.A "wz=E\Z'a 2Ǚ#;'}G*l^"q+2FQ hjkŦ${ޮ-T٭cf|3#~RJt$b(R(rdx >U b&9,>%E\ Άe$'q't*אެb-|dSBOO$R+H)܎K1m`;J2Y~9Og8=vqD`K[F)k[1m޼cn]skz$@)!I x՝"v9=ZA=`Ɠi :E)`7vI}dYI_ o:obo 3Q&D&2= Ά;>hy.*ⅥSӬ+q&j|UƧ}J0WW< ۋS)jQRjƯrN)Gű4Ѷ(S)Ǣ8iW52No˓ ۍ%5brOnL;n\G=^UdI8$&h'+(cȁ߫klS^cƗjԌEꭔgFȒ@}O*;evWVYJ\]X'5ղkFb 6Ro՜mi Ni>J?lPmU}>_Z&KKqrIDՉ~q3fL:Se>E-G{L6pe,8QIhaXaUA'ʂs+טIjP-y8ۈZ?J$WP Rs]|l(ԓsƊio(S0Y 8T97.WiLc~dxcE|2!XKƘਫ਼$((6~|d9u+qd^389Y6L.I?iIq9)O/뚅OXXVZF[یgQLK1RҖr@v#XlFНyS87kF!AsM^rkpjPDyS$Nqnxҍ!Uf!ehi2m`YI9r6 TFC}/y^Η5d'9A-J>{_l+`A['յϛ#w:݅%X}&PStQ"-\縵/$ƗhXb*yBS;Wջ_mcvt?2}1;qSdd~u:2k52R~z+|HE!)Ǟl7`0<,2*Hl-x^'_TVgZA'j ^2ΪN7t?w x1fIzC-ȖK^q;-WDvT78Z hK(P:Q- 8nZ܃e貾<1YT<,"6{/ ?͟|1:#gW>$dJdB=jf[%rE^il:BxSּ1հ,=*7 fcG#q eh?27,!7x6nLC4x},GeǝtC.vS F43zz\;QYC,6~;RYS/6|25vTimlv& nRh^ejRLGf? ۉҬܦƩ|Ȱ>3!viʯ>vオX3e_1zKȗ\qHS,EW[㺨uch⍸O}a>q6n6N6qN ! 1AQaq0@"2BRb#Pr3C`Scst$4D%Td ?Na3mCwxAmqmm$4n淿t'C"wzU=D\R+wp+YT&պ@ƃ3ޯ?AﶂaŘ@-Q=9Dռѻ@MVP܅G5fY6# ?0UQ,IX(6ڵ[DIMNލc&υj\XR|,4 jThAe^db#$]wOӪ1y%LYm뭛CUƃߜ}Cy1XνmF8jI]HۺиE@Ii;r8ӭVFՇ| &?3|xBMuSGe=Ӕ#BE5GY!z_eqр/W>|-Ci߇t1ޯќdR3ug=0 5[?#͏qcfH{ ?u=??ǯ}ZzhmΔBFTWPxs}G93 )gGR<>r h$'nchPBjJҧH -N1N?~}-q!=_2hcMlvY%UE@|vM2.Y[|y"EïKZF,ɯ?,q?vM 80jx";9vk+ ֧ ȺU?%vcVmA6Qg^MA}3nl QRNl8kkn'(M7m9وq%ޟ*h$Zk"$9: ?U8Sl,,|ɒxH(ѷGn/Q4PG%Ա8N! &7;eKM749R/%lc>x;>C:th?aKXbheᜋ^$Iհ hr7%F$EFdt5+(M6tÜUU|zW=aTsTgdqPQb'm1{|YXNb P~F^F:k6"j! Ir`1&-$Bevk:y#ywI0x=D4tUPZHڠ底taP6b>xaQ# WeFŮNjpJ* mQN*I-*ȩFg3 5Vʊɮa5FO@{NX?H]31Ri_uѕ 0 F~:60p͈SqX#a5>`o&+<2D: ڝ$nP*)N|yEjF5ټeihyZ >kbHavh-#!Po=@k̆IEN@}Ll?jO߭ʞQ|A07xwt!xfI2?Z<ץTcUj]陎Ltl }5ϓ$,Omˊ;@OjEj(ا,LXLOЦ90O .anA7j4 W_ٓzWjcBy՗+EM)dNg6y1_xp$Lv:9"zpʙ$^JԼ*ϭo=xLj6Ju82AH3$ٕ@=Vv]'qEz;I˼)=ɯx /W(Vp$ mu񶤑OqˎTr㠚xsrGCbypG1ߠw e8$⿄/M{*}W]˷.CK\ުx/$WPwr |i&}{X >$-l?-zglΆ(FhvS*b߲ڡn,|)mrH[a3ר[13o_U3TC$(=)0kgP u^=4 WYCҸ:vQרXàtkm,t*^,}D* "(I9R>``[~Q]#afi6l86:,ssN6j"A4IuQ6E,GnHzSHOuk5$I4ؤQ9@CwpBGv[]uOv0I4\yQѸ~>Z8Taqޣ;za/SI:ܫ_|>=Z8:SUIJ"IY8%b8H:QO6;7ISJҌAά3>cE+&jf$eC+z;V rʺmyeaQf&6ND.:NTvm<- uǝ\MvZYNNT-A>jr!SnO 13Ns%3D@`ܟ 1^c< aɽ̲Xë#w|ycW=9I*H8p^(4՗karOcWtO\ƍR8'KIQ?5>[}yUײ -h=% qThG2)"ו3]!kB*pFDlA,eEiHfPs5H:Փ~H0DتDIhF3c2E9H5zԑʚiX=:mxghd(v׊9iSOd@0ڽ:p5h-t&Xqӕ,ie|7A2O%PEhtjY1wЃ!  ࢽMy7\a@ţJ 4ȻF@o̒?4wx)]P~u57X 9^ܩU;Iꭆ 5 eK27({|Y׎ V\"Z1 Z}(Ǝ"1S_vE30>p; ΝD%xW?W?vo^Vidr[/&>~`9Why;R ;;ɮT?r$g1KACcKl:'3 cﳯ*"t8~l)m+U,z`(>yJ?h>]vЍG*{`;y]IT ;cNUfo¾h/$|NS1S"HVT4uhǜ]v;5͠x'C\SBplh}N ABx%ޭl/Twʽ]D=Kžr㻠l4SO?=k M: cCa#ha)ѐxcsgPiG{+xQI= zԫ+ 8"kñj=|c yCF/*9жh{ ?4o kmQNx;Y4膚aw?6>e]Qr:g,i"ԩA*M7qB?ӕFhV25r[7 Y }LR}*sg+xr2U=*'WSZDW]WǞ<叓{$9Ou4y90-1'*D`c^o?(9uݐ'PI& fJݮ:wSjfP1F:X H9dԯ˝[_54 }*;@ܨ ðynT?ןd#4rGͨH1|-#MrS3G3).᧏3vz֑r$G"`j 1tx0<ƆWh6y6,œGagAyb)hDß_mü gG;evݝnQ C-*oyaMI><]obD":GA-\%LT8c)+y76oQ#*{(F⽕y=rW\p۩cA^e6KʐcVf5$'->ՉN"F"UQ@fGb~#&M=8טJNu9D[̤so~ G9TtW^g5y$bY'سǴ=U-2 #MCt(i lj@Q 5̣i*OsxKf}\M{EV{υƇ);HIfeLȣr2>WIȂ6ik 5YOxȺ>Yf5'|H+98pjn.OyjY~iw'l;s2Y:'lgꥴ)o#'SaaKZ m}`169n"xI *+ }FP"l45'ZgE8?[X7(.Q-*ތL@̲v.5[=t\+CNܛ,gSQnH}*FG16&:t4ُ"Ạ$b |#rsaT ]ӽDP7ո0y)e$ٕvIh'QEAm*HRI=: 4牢) %_iNݧl] NtGHL ɱg<1V,J~ٹ"KQ 9HS9?@kr;we݁]I!{ @G["`J:n]{cAEVʆ#U96j#Ym\qe4hB7Cdv\MNgmAyQL4uLjj9#44tl^}LnR!t±]rh6ٍ>yҏNfU  Fm@8}/ujb9he:AyծwGpΧh5l}3p468)Udc;Us/֔YX1O2uqs`hwgr~{ RmhN؎*q 42*th>#E#HvOq}6e\,Wk#Xb>p}դ3T5†6[@Py*n|'f֧>lư΂̺SU'*qp_SM 'c6m ySʨ;MrƋmKxo,GmPAG:iw9}M(^V$ǒѽ9| aJSQarB;}ٻ֢2%Uc#gNaݕ'v[OY'3L3;,p]@S{lsX'cjwk'a.}}& dP*bK=ɍ!;3ngΊUߴmt'*{,=SzfD Ako~Gaoq_mi}#mPXhύmxǍ΂巿zfQc|kc?WY$_Lvl߶c`?ljݲˏ!V6UЂ(A4y)HpZ_x>eR$/`^'3qˏ-&Q=?CFVR DfV9{8gnh(P"6[D< E~0<@`G6Hгcc cK.5DdB`?XQ2ٿyqo&+1^ DW0ꊩG#QnL3c/x 11[yxპCWCcUĨ80me4.{muI=f0QRls9f9~fǨa"@8ȁQ#cicG$Gr/$W(WV"m7[mAmboD j۳ l^kh׽ # iXnveTka^Y4BNĕ0 !01@Q"2AaPq3BR?@4QT3,㺠W[=JKϞ2r^7vc:9 EߴwS#dIxu:Hp9E! V 2;73|F9Y*ʬFDu&y؟^EAA(ɩ^GV:ݜDy`Jr29ܾ㝉[E;FzxYGUeYC v-txIsםĘqEb+P\ :>iC';k|zرny]#ǿbQw(r|ӹs[D2v-%@;8<a[\o[ϧwI!*0krs)[J9^ʜp1) "/_>o<1AEy^C`x1'ܣnps`lfQ):lb>MejH^?kl3(z:1ŠK&?Q~{ٺhy/[V|6}KbXmn[-75q94dmc^h X5G-}دBޟ |rtMV+]c?-#ڛ^ǂ}LkrOu>-Dry D?:ޞUǜ7V?瓮"#rչģVR;n/_ ؉vݶe5db9/O009G5nWJpA*r9>1.[tsFnQ V 77R]ɫ8_0<՜IFu(v4Fk3E)N:yڮeP`1}$WSJSQNjٺ޵#lј(5=5lǏmoWv-1v,Wmn߀$x_DȬ0¤#QR[Vkzmw"9ZG7'[=Qj8R?zf\a=OU*oBA|G254 p.w7  &ξxGHp B%$gtЏ򤵍zHNuЯ-'40;_3 !01"@AQa2Pq#3BR?ʩcaen^8F<7;EA{EÖ1U/#d1an.1ě0ʾRh|RAo3m3 % 28Q yφHTo7lW>#i`qca m,B-j݋'mR1Ήt>Vps0IbIC.1Rea]H64B>o]($Bma!=?B KǾ+Ծ"nK*+[T#{EJSQs5:U\wĐf3܆&)IԆwE TlrTf6Q|Rh:[K zc֧GC%\_a84HcObiؖV7H )*ģK~Xhչ04?0 E<}3#u? |gS6ꊤ|I#Hڛ աwX97Ŀ%SLy6č|Fa 8b$sקhb9RAu7˨pČ_\*w묦F 4D~f|("mNKiS>$d7SlA/²SL|6N}S˯g]6; #. 403WebShell
403Webshell
Server IP : 13.127.148.211  /  Your IP : 216.73.216.149
Web Server : Apache/2.4.41 (Ubuntu)
System : Linux ip-172-31-43-195 5.15.0-1084-aws #91~20.04.1-Ubuntu SMP Fri May 2 06:59:36 UTC 2025 x86_64
User : www-data ( 33)
PHP Version : 7.4.3-4ubuntu2.29
Disable Function : pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,pcntl_unshare,
MySQL : OFF  |  cURL : ON  |  WGET : ON  |  Perl : ON  |  Python : OFF  |  Sudo : ON  |  Pkexec : ON
Directory :  /proc/thread-self/root/usr/lib/python3/dist-packages/twisted/conch/test/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Command :


[ Back ]     

Current File : /proc/thread-self/root/usr/lib/python3/dist-packages/twisted/conch/test/test_manhole.py
# -*- test-case-name: twisted.conch.test.test_manhole -*-
# Copyright (c) Twisted Matrix Laboratories.
# See LICENSE for details.

# pylint: disable=I0011,W9401,W9402

"""
Tests for L{twisted.conch.manhole}.
"""

import traceback

from twisted.trial import unittest
from twisted.internet import error, defer
from twisted.test.proto_helpers import StringTransport
from twisted.conch.test.test_recvline import (
    _TelnetMixin, _SSHMixin, _StdioMixin, stdio, ssh)
from twisted.conch import manhole
from twisted.conch.insults import insults


def determineDefaultFunctionName():
    """
    Return the string used by Python as the name for code objects which are
    compiled from interactive input or at the top-level of modules.
    """
    try:
        1 // 0
    except:
        # The last frame is this function.  The second to last frame is this
        # function's caller, which is module-scope, which is what we want,
        # so -2.
        return traceback.extract_stack()[-2][2]
defaultFunctionName = determineDefaultFunctionName()



class ManholeInterpreterTests(unittest.TestCase):
    """
    Tests for L{manhole.ManholeInterpreter}.
    """
    def test_resetBuffer(self):
        """
        L{ManholeInterpreter.resetBuffer} should empty the input buffer.
        """
        interpreter = manhole.ManholeInterpreter(None)
        interpreter.buffer.extend(["1", "2"])
        interpreter.resetBuffer()
        self.assertFalse(interpreter.buffer)



class ManholeProtocolTests(unittest.TestCase):
    """
    Tests for L{manhole.Manhole}.
    """
    def test_interruptResetsInterpreterBuffer(self):
        """
        L{manhole.Manhole.handle_INT} should cause the interpreter input buffer
        to be reset.
        """
        transport = StringTransport()
        terminal = insults.ServerProtocol(manhole.Manhole)
        terminal.makeConnection(transport)
        protocol = terminal.terminalProtocol
        interpreter = protocol.interpreter
        interpreter.buffer.extend(["1", "2"])
        protocol.handle_INT()
        self.assertFalse(interpreter.buffer)



class WriterTests(unittest.TestCase):
    def test_Integer(self):
        """
        Colorize an integer.
        """
        manhole.lastColorizedLine("1")


    def test_DoubleQuoteString(self):
        """
        Colorize an integer in double quotes.
        """
        manhole.lastColorizedLine('"1"')


    def test_SingleQuoteString(self):
        """
        Colorize an integer in single quotes.
        """
        manhole.lastColorizedLine("'1'")


    def test_TripleSingleQuotedString(self):
        """
        Colorize an integer in triple quotes.
        """
        manhole.lastColorizedLine("'''1'''")


    def test_TripleDoubleQuotedString(self):
        """
        Colorize an integer in triple and double quotes.
        """
        manhole.lastColorizedLine('"""1"""')


    def test_FunctionDefinition(self):
        """
        Colorize a function definition.
        """
        manhole.lastColorizedLine("def foo():")


    def test_ClassDefinition(self):
        """
        Colorize a class definition.
        """
        manhole.lastColorizedLine("class foo:")


    def test_unicode(self):
        """
        Colorize a Unicode string.
        """
        res = manhole.lastColorizedLine(u"\u0438")
        self.assertTrue(isinstance(res, bytes))


    def test_bytes(self):
        """
        Colorize a UTF-8 byte string.
        """
        res = manhole.lastColorizedLine(b"\xd0\xb8")
        self.assertTrue(isinstance(res, bytes))


    def test_identicalOutput(self):
        """
        The output of UTF-8 bytestrings and Unicode strings are identical.
        """
        self.assertEqual(manhole.lastColorizedLine(b"\xd0\xb8"),
                         manhole.lastColorizedLine(u"\u0438"))



class ManholeLoopbackMixin:
    serverProtocol = manhole.ColoredManhole


    def wfd(self, d):
        return defer.waitForDeferred(d)


    def test_SimpleExpression(self):
        """
        Evaluate simple expression.
        """
        done = self.recvlineClient.expect(b"done")

        self._testwrite(
            b"1 + 1\n"
            b"done")

        def finished(ign):
            self._assertBuffer(
                [b">>> 1 + 1",
                 b"2",
                 b">>> done"])

        return done.addCallback(finished)


    def test_TripleQuoteLineContinuation(self):
        """
        Evaluate line continuation in triple quotes.
        """
        done = self.recvlineClient.expect(b"done")

        self._testwrite(
            b"'''\n'''\n"
            b"done")

        def finished(ign):
            self._assertBuffer(
                [b">>> '''",
                 b"... '''",
                 b"'\\n'",
                 b">>> done"])

        return done.addCallback(finished)


    def test_FunctionDefinition(self):
        """
        Evaluate function definition.
        """
        done = self.recvlineClient.expect(b"done")

        self._testwrite(
            b"def foo(bar):\n"
            b"\tprint(bar)\n\n"
            b"foo(42)\n"
            b"done")

        def finished(ign):
            self._assertBuffer(
                [b">>> def foo(bar):",
                 b"...     print(bar)",
                 b"... ",
                 b">>> foo(42)",
                 b"42",
                 b">>> done"])

        return done.addCallback(finished)


    def test_ClassDefinition(self):
        """
        Evaluate class definition.
        """
        done = self.recvlineClient.expect(b"done")
        self._testwrite(
            b"class Foo:\n"
            b"\tdef bar(self):\n"
            b"\t\tprint('Hello, world!')\n\n"
            b"Foo().bar()\n"
            b"done")

        def finished(ign):
            self._assertBuffer(
                [b">>> class Foo:",
                 b"...     def bar(self):",
                 b"...         print('Hello, world!')",
                 b"... ",
                 b">>> Foo().bar()",
                 b"Hello, world!",
                 b">>> done"])

        return done.addCallback(finished)


    def test_Exception(self):
        """
        Evaluate raising an exception.
        """
        done = self.recvlineClient.expect(b"done")

        self._testwrite(
            b"raise Exception('foo bar baz')\n"
            b"done")

        def finished(ign):
            self._assertBuffer(
                [b">>> raise Exception('foo bar baz')",
                 b"Traceback (most recent call last):",
                 b'  File "<console>", line 1, in ' +
                 defaultFunctionName.encode("utf-8"),
                 b"Exception: foo bar baz",
                 b">>> done"])

        return done.addCallback(finished)


    def test_ControlC(self):
        """
        Evaluate interrupting with CTRL-C.
        """
        done = self.recvlineClient.expect(b"done")

        self._testwrite(
            b"cancelled line" + manhole.CTRL_C +
            b"done")

        def finished(ign):
            self._assertBuffer(
                [b">>> cancelled line",
                 b"KeyboardInterrupt",
                 b">>> done"])

        return done.addCallback(finished)


    def test_interruptDuringContinuation(self):
        """
        Sending ^C to Manhole while in a state where more input is required to
        complete a statement should discard the entire ongoing statement and
        reset the input prompt to the non-continuation prompt.
        """
        continuing = self.recvlineClient.expect(b"things")

        self._testwrite(b"(\nthings")

        def gotContinuation(ignored):
            self._assertBuffer(
                [b">>> (",
                 b"... things"])
            interrupted = self.recvlineClient.expect(b">>> ")
            self._testwrite(manhole.CTRL_C)
            return interrupted
        continuing.addCallback(gotContinuation)

        def gotInterruption(ignored):
            self._assertBuffer(
                [b">>> (",
                 b"... things",
                 b"KeyboardInterrupt",
                 b">>> "])
        continuing.addCallback(gotInterruption)
        return continuing


    def test_ControlBackslash(self):
        """
        Evaluate cancelling with CTRL-\.
        """
        self._testwrite(b"cancelled line")
        partialLine = self.recvlineClient.expect(b"cancelled line")

        def gotPartialLine(ign):
            self._assertBuffer(
                [b">>> cancelled line"])
            self._testwrite(manhole.CTRL_BACKSLASH)

            d = self.recvlineClient.onDisconnection
            return self.assertFailure(d, error.ConnectionDone)

        def gotClearedLine(ign):
            self._assertBuffer(
                [b""])

        return partialLine.addCallback(gotPartialLine).addCallback(
            gotClearedLine)


    @defer.inlineCallbacks
    def test_controlD(self):
        """
        A CTRL+D in the middle of a line doesn't close a connection,
        but at the beginning of a line it does.
        """
        self._testwrite(b"1 + 1")
        yield self.recvlineClient.expect(br"\+ 1")
        self._assertBuffer([b">>> 1 + 1"])

        self._testwrite(manhole.CTRL_D + b" + 1")
        yield self.recvlineClient.expect(br"\+ 1")
        self._assertBuffer([b">>> 1 + 1 + 1"])

        self._testwrite(b"\n")
        yield self.recvlineClient.expect(b"3\n>>> ")

        self._testwrite(manhole.CTRL_D)
        d = self.recvlineClient.onDisconnection
        yield self.assertFailure(d, error.ConnectionDone)


    @defer.inlineCallbacks
    def test_ControlL(self):
        """
        CTRL+L is generally used as a redraw-screen command in terminal
        applications.  Manhole doesn't currently respect this usage of it,
        but it should at least do something reasonable in response to this
        event (rather than, say, eating your face).
        """
        # Start off with a newline so that when we clear the display we can
        # tell by looking for the missing first empty prompt line.
        self._testwrite(b"\n1 + 1")
        yield self.recvlineClient.expect(br"\+ 1")
        self._assertBuffer([b">>> ", b">>> 1 + 1"])

        self._testwrite(manhole.CTRL_L + b" + 1")
        yield self.recvlineClient.expect(br"1 \+ 1 \+ 1")
        self._assertBuffer([b">>> 1 + 1 + 1"])


    def test_controlA(self):
        """
        CTRL-A can be used as HOME - returning cursor to beginning of
        current line buffer.
        """
        self._testwrite(b'rint "hello"' + b'\x01' + b'p')
        d = self.recvlineClient.expect(b'print "hello"')
        def cb(ignore):
            self._assertBuffer([b'>>> print "hello"'])
        return d.addCallback(cb)


    def test_controlE(self):
        """
        CTRL-E can be used as END - setting cursor to end of current
        line buffer.
        """
        self._testwrite(b'rint "hello' + b'\x01' + b'p' + b'\x05' + b'"')
        d = self.recvlineClient.expect(b'print "hello"')
        def cb(ignore):
            self._assertBuffer([b'>>> print "hello"'])
        return d.addCallback(cb)


    @defer.inlineCallbacks
    def test_deferred(self):
        """
        When a deferred is returned to the manhole REPL, it is displayed with
        a sequence number, and when the deferred fires, the result is printed.
        """
        self._testwrite(
            b"from twisted.internet import defer, reactor\n"
            b"d = defer.Deferred()\n"
            b"d\n")

        yield self.recvlineClient.expect(b"<Deferred #0>")

        self._testwrite(
            b"c = reactor.callLater(0.1, d.callback, 'Hi!')\n")
        yield self.recvlineClient.expect(b">>> ")

        yield self.recvlineClient.expect(
            b"Deferred #0 called back: 'Hi!'\n>>> ")
        self._assertBuffer(
            [b">>> from twisted.internet import defer, reactor",
             b">>> d = defer.Deferred()",
             b">>> d",
             b"<Deferred #0>",
             b">>> c = reactor.callLater(0.1, d.callback, 'Hi!')",
             b"Deferred #0 called back: 'Hi!'",
             b">>> "])



class ManholeLoopbackTelnetTests(_TelnetMixin, unittest.TestCase,
                                 ManholeLoopbackMixin):
    """
    Test manhole loopback over Telnet.
    """
    pass



class ManholeLoopbackSSHTests(_SSHMixin, unittest.TestCase,
                              ManholeLoopbackMixin):
    """
    Test manhole loopback over SSH.
    """
    if ssh is None:
        skip = "cryptography requirements missing"



class ManholeLoopbackStdioTests(_StdioMixin, unittest.TestCase,
                                ManholeLoopbackMixin):
    """
    Test manhole loopback over standard IO.
    """
    if stdio is None:
        skip = "Terminal requirements missing"
    else:
        serverProtocol = stdio.ConsoleManhole



class ManholeMainTests(unittest.TestCase):
    """
    Test the I{main} method from the I{manhole} module.
    """
    if stdio is None:
        skip = "Terminal requirements missing"


    def test_mainClassNotFound(self):
        """
        Will raise an exception when called with an argument which is a
        dotted patch which can not be imported..
        """
        exception = self.assertRaises(
            ValueError,
            stdio.main, argv=['no-such-class'],
            )

        self.assertEqual('Empty module name', exception.args[0])

Youez - 2016 - github.com/yon3zu
LinuXploit