JFIFXX    $.' ",#(7),01444'9=82<.342  2!!22222222222222222222222222222222222222222222222222"4 ,PG"Z_4˷kjزZ,F+_z,© zh6٨icfu#ډb_N?wQ5-~I8TK<5oIv-k_U_~bMdӜUHh?]EwQk{_}qFW7HTՑYF?_'ϔ_Ջt=||I 6έ"D/[k9Y8ds|\Ҿp6Ҵ].6znopM[mei$[soᘨ˸ nɜG-ĨUycP3.DBli;hjx7Z^NhN3u{:jx힞#M&jL P@_ P&o89@Sz6t7#Oߋ s}YfTlmrZ)'Nk۞pw\Tȯ?8`Oi{wﭹW[r Q4F׊3m&L=h3z~#\l :F,j@ ʱwQT8"kJO6֚l}R>ډK]y&p}b;N1mr$|7>e@BTM*-iHgD) Em|ؘbҗaҾt4oG*oCNrPQ@z,|?W[0:n,jWiEW$~/hp\?{(0+Y8rΟ+>S-SVN;}s?. w9˟<Mq4Wv'{)01mBVW[8/< %wT^5b)iM pgN&ݝVO~qu9 !J27$O-! :%H ـyΠM=t{!S oK8txA& j0 vF Y|y ~6@c1vOpIg4lODL Rcj_uX63?nkWyf;^*B @~a`Eu+6L.ü>}y}_O6͐:YrGXkGl^w~㒶syIu! W XN7BVO!X2wvGRfT#t/?%8^WaTGcLMI(J1~8?aT ]ASE(*E} 2#I/׍qz^t̔bYz4xt){ OH+(EA&NXTo"XC')}Jzp ~5}^+6wcQ|LpdH}(.|kc4^"Z?ȕ a<L!039C EuCFEwç ;n?*oB8bʝ'#RqfM}7]s2tcS{\icTx;\7KPʇ Z O-~c>"?PEO8@8GQgaՎ󁶠䧘_%#r>1zaebqcPѵn#L =׀t L7`VA{C:ge@w1 Xp3c3ġpM"'-@n4fGB3DJ8[JoߐgK)ƛ$ 83+ 6ʻ SkI*KZlT _`?KQKdB`s}>`*>,*@JdoF*弝O}ks]yߘc1GV<=776qPTtXԀ!9*44Tހ3XΛex46YD  BdemDa\_l,G/֌7Y](xTt^%GE4}bTڹ;Y)BQu>J/J ⮶.XԄjݳ+Ed r5_D1 o Bx΢#<W8R6@gM. drD>(otU@x=~v2 ӣdoBd3eO6㣷ݜ66YQz`S{\P~z m5{J/L1xO\ZFu>ck#&:`$ai>2ΔloF[hlEܺΠk:)` $[69kOw\|8}ބ:񶐕IA1/=2[,!.}gN#ub ~݊}34qdELc$"[qU硬g^%B zrpJru%v\h1Yne`ǥ:gpQM~^Xi `S:V29.PV?Bk AEvw%_9CQwKekPؠ\;Io d{ ߞoc1eP\ `E=@KIRYK2NPlLɀ)&eB+ь( JTx_?EZ }@ 6U뙢طzdWIn` D噥[uV"G&Ú2g}&m?ċ"Om# {ON"SXNeysQ@FnVgdX~nj]J58up~.`r\O,ư0oS _Ml4kv\JSdxSW<AeIX$Iw:Sy›R9Q[,5;@]%u@ *rolbI  +%m:͇ZVủθau,RW33 dJeTYE.Mϧ-oj3+yy^cVO9NV\nd1 !͕_)av;թMlWR1)ElP;yوÏu 3k5Pr6<⒲l!˞*u־n!l:UNW %Chx8vL'X@*)̮ˍ D-M+JUkvK+x8cY?Ԡ~3mo|u@[XeYC\Kpx8oCC&N~3-H MXsu<`~"WL$8ξ3a)|:@m\^`@ҷ)5p+6p%i)P Mngc#0AruzRL+xSS?ʮ}()#tmˇ!0}}y$6Lt;$ʳ{^6{v6ķܰgVcnn ~zx«,2u?cE+ȘH؎%Za)X>uWTzNyosFQƤ$*&LLXL)1" LeOɟ9=:tZcŽY?ӭVwv~,Yrۗ|yGaFC.+ v1fήJ]STBn5sW}y$~z'c 8  ,! pVNSNNqy8z˱A4*'2n<s^ǧ˭PJޮɏUGLJ*#i}K%,)[z21z ?Nin1?TIR#m-1lA`fT5+ܐcq՝ʐ,3f2Uեmab#ŠdQy>\)SLYw#.ʑf ,"+w~N'cO3FN<)j&,- љ֊_zSTǦw>?nU仆Ve0$CdrP m׈eXmVu L.bֹ [Դaզ*\y8Է:Ez\0KqC b̘cөQ=0YsNS.3.Oo:#v7[#߫ 5܎LEr49nCOWlG^0k%;YߝZǓ:S#|}y,/kLd TA(AI$+I3;Y*Z}|ӧOdv..#:nf>>ȶITX 8y"dR|)0=n46ⲑ+ra ~]R̲c?6(q;5% |uj~z8R=XIV=|{vGj\gcqz؋%Mߍ1y#@f^^>N#x#۹6Y~?dfPO{P4Vu1E1J *|%JN`eWuzk M6q t[ gGvWIGu_ft5j"Y:Tɐ*; e54q$C2d} _SL#mYpO.C;cHi#֩%+) ӍƲVSYźg |tj38r|V1#;.SQA[S#`n+$$I P\[@s(EDzP])8G#0B[ىXIIq<9~[Z멜Z⊔IWU&A>P~#dp]9 "cP Md?٥Ifتuk/F9c*9Ǎ:ØFzn*@|Iށ9N3{'['ͬҲ4#}!V Fu,,mTIkv C7vB6kT91*l '~ƞFlU'M ][ΩũJ_{iIn$L jOdxkza۪#EClx˘oVɞljr)/,߬hL#^Lф,íMƁe̩NBLiLq}(q6IçJ$WE$:=#(KBzђ xlx?>Պ+>W,Ly!_DŌlQ![ SJ1ƐY}b,+Loxɓ)=yoh@꥟/Iѭ=Py9 ۍYӘe+pJnϱ?V\SO%(t =?MR[Șd/ nlB7j !;ӥ/[-A>dNsLj ,ɪv=1c.SQO3UƀܽE̻9GϷD7(}Ävӌ\y_0[w <΍>a_[0+LF.޺f>oNTq;y\bՃyjH<|q-eɏ_?_9+PHp$[uxK wMwNی'$Y2=qKBP~Yul:[<F12O5=d]Ysw:ϮEj,_QXz`H1,#II dwrP˂@ZJVy$\y{}^~[:NߌUOdؾe${p>G3cĖlʌ ת[`ϱ-WdgIig2 }s ؤ(%#sS@~3XnRG~\jc3vӍLM[JBTs3}jNʖW;7ç?=XF=-=qߚ#='c7ڑWI(O+=:uxqe2zi+kuGR0&eniT^J~\jyp'dtGsO39* b#Ɋ p[BwsT>d4ۧsnvnU_~,vƜJ1s QIz)(lv8MU=;56Gs#KMP=LvyGd}VwWBF'à ?MHUg2 !p7Qjڴ=ju JnA suMeƆҔ!)'8Ϣٔޝ(Vpצ֖d=ICJǠ{qkԭ߸i@Ku|p=..*+xz[Aqġ#s2aƊRR)*HRsi~a &fMP-KL@ZXy'x{}Zm+:)) IJ-iu ܒH'L(7yGӜq j 6ߌg1go,kرtY?W,pefOQS!K۟cҒA|սj>=⬒˧L[ ߿2JaB~Ru:Q] 0H~]7ƼI(}cq 'ήETq?fabӥvr )o-Q_'ᴎoK;Vo%~OK *bf:-ťIR`B5!RB@ï u ̯e\_U_ gES3QTaxU<~c?*#]MW,[8Oax]1bC|踤Plw5V%){t<d50iXSUm:Z┵i"1^B-PhJ&)O*DcWvM)}Pܗ-q\mmζZ-l@}aE6F@&Sg@ݚM ȹ 4#p\HdYDoH"\..RBHz_/5˘6KhJRPmƶim3,#ccoqa)*PtRmk7xDE\Y閣_X<~)c[[BP6YqS0%_;Àv~| VS؇ 'O0F0\U-d@7SJ*z3nyPOm~P3|Yʉr#CSN@ ƮRN)r"C:: #qbY. 6[2K2uǦHYRQMV G$Q+.>nNHq^ qmMVD+-#*U̒ p욳u:IBmPV@Or[b= 1UE_NmyKbNOU}the`|6֮P>\2PVIDiPO;9rmAHGWS]J*_G+kP2KaZH'KxWMZ%OYDRc+o?qGhmdSoh\D|:WUAQc yTq~^H/#pCZTI1ӏT4"ČZ}`w#*,ʹ 0i課Om*da^gJ݅{le9uF#Tֲ̲ٞC"qߍ ոޑo#XZTp@ o8(jdxw],f`~|,s^f1t|m򸄭/ctr5s79Q4H1꠲BB@l9@C+wpxu£Yc9?`@#omHs2)=2.ljg9$YS%*LRY7Z,*=䷘$armoϰUW.|rufIGwtZwo~5 YյhO+=8fF)W7L9lM̘·Y֘YLf큹pRF99.A "wz=E\Z'a 2Ǚ#;'}G*l^"q+2FQ hjkŦ${ޮ-T٭cf|3#~RJt$b(R(rdx >U b&9,>%E\ Άe$'q't*אެb-|dSBOO$R+H)܎K1m`;J2Y~9Og8=vqD`K[F)k[1m޼cn]skz$@)!I x՝"v9=ZA=`Ɠi :E)`7vI}dYI_ o:obo 3Q&D&2= Ά;>hy.*ⅥSӬ+q&j|UƧ}J0WW< ۋS)jQRjƯrN)Gű4Ѷ(S)Ǣ8iW52No˓ ۍ%5brOnL;n\G=^UdI8$&h'+(cȁ߫klS^cƗjԌEꭔgFȒ@}O*;evWVYJ\]X'5ղkFb 6Ro՜mi Ni>J?lPmU}>_Z&KKqrIDՉ~q3fL:Se>E-G{L6pe,8QIhaXaUA'ʂs+טIjP-y8ۈZ?J$WP Rs]|l(ԓsƊio(S0Y 8T97.WiLc~dxcE|2!XKƘਫ਼$((6~|d9u+qd^389Y6L.I?iIq9)O/뚅OXXVZF[یgQLK1RҖr@v#XlFНyS87kF!AsM^rkpjPDyS$Nqnxҍ!Uf!ehi2m`YI9r6 TFC}/y^Η5d'9A-J>{_l+`A['յϛ#w:݅%X}&PStQ"-\縵/$ƗhXb*yBS;Wջ_mcvt?2}1;qSdd~u:2k52R~z+|HE!)Ǟl7`0<,2*Hl-x^'_TVgZA'j ^2ΪN7t?w x1fIzC-ȖK^q;-WDvT78Z hK(P:Q- 8nZ܃e貾<1YT<,"6{/ ?͟|1:#gW>$dJdB=jf[%rE^il:BxSּ1հ,=*7 fcG#q eh?27,!7x6nLC4x},GeǝtC.vS F43zz\;QYC,6~;RYS/6|25vTimlv& nRh^ejRLGf? ۉҬܦƩ|Ȱ>3!viʯ>vオX3e_1zKȗ\qHS,EW[㺨uch⍸O}a>q6n6N6qN ! 1AQaq0@"2BRb#Pr3C`Scst$4D%Td ?Na3mCwxAmqmm$4n淿t'C"wzU=D\R+wp+YT&պ@ƃ3ޯ?AﶂaŘ@-Q=9Dռѻ@MVP܅G5fY6# ?0UQ,IX(6ڵ[DIMNލc&υj\XR|,4 jThAe^db#$]wOӪ1y%LYm뭛CUƃߜ}Cy1XνmF8jI]HۺиE@Ii;r8ӭVFՇ| &?3|xBMuSGe=Ӕ#BE5GY!z_eqр/W>|-Ci߇t1ޯќdR3ug=0 5[?#͏qcfH{ ?u=??ǯ}ZzhmΔBFTWPxs}G93 )gGR<>r h$'nchPBjJҧH -N1N?~}-q!=_2hcMlvY%UE@|vM2.Y[|y"EïKZF,ɯ?,q?vM 80jx";9vk+ ֧ ȺU?%vcVmA6Qg^MA}3nl QRNl8kkn'(M7m9وq%ޟ*h$Zk"$9: ?U8Sl,,|ɒxH(ѷGn/Q4PG%Ա8N! &7;eKM749R/%lc>x;>C:th?aKXbheᜋ^$Iհ hr7%F$EFdt5+(M6tÜUU|zW=aTsTgdqPQb'm1{|YXNb P~F^F:k6"j! Ir`1&-$Bevk:y#ywI0x=D4tUPZHڠ底taP6b>xaQ# WeFŮNjpJ* mQN*I-*ȩFg3 5Vʊɮa5FO@{NX?H]31Ri_uѕ 0 F~:60p͈SqX#a5>`o&+<2D: ڝ$nP*)N|yEjF5ټeihyZ >kbHavh-#!Po=@k̆IEN@}Ll?jO߭ʞQ|A07xwt!xfI2?Z<ץTcUj]陎Ltl }5ϓ$,Omˊ;@OjEj(ا,LXLOЦ90O .anA7j4 W_ٓzWjcBy՗+EM)dNg6y1_xp$Lv:9"zpʙ$^JԼ*ϭo=xLj6Ju82AH3$ٕ@=Vv]'qEz;I˼)=ɯx /W(Vp$ mu񶤑OqˎTr㠚xsrGCbypG1ߠw e8$⿄/M{*}W]˷.CK\ުx/$WPwr |i&}{X >$-l?-zglΆ(FhvS*b߲ڡn,|)mrH[a3ר[13o_U3TC$(=)0kgP u^=4 WYCҸ:vQרXàtkm,t*^,}D* "(I9R>``[~Q]#afi6l86:,ssN6j"A4IuQ6E,GnHzSHOuk5$I4ؤQ9@CwpBGv[]uOv0I4\yQѸ~>Z8Taqޣ;za/SI:ܫ_|>=Z8:SUIJ"IY8%b8H:QO6;7ISJҌAά3>cE+&jf$eC+z;V rʺmyeaQf&6ND.:NTvm<- uǝ\MvZYNNT-A>jr!SnO 13Ns%3D@`ܟ 1^c< aɽ̲Xë#w|ycW=9I*H8p^(4՗karOcWtO\ƍR8'KIQ?5>[}yUײ -h=% qThG2)"ו3]!kB*pFDlA,eEiHfPs5H:Փ~H0DتDIhF3c2E9H5zԑʚiX=:mxghd(v׊9iSOd@0ڽ:p5h-t&Xqӕ,ie|7A2O%PEhtjY1wЃ!  ࢽMy7\a@ţJ 4ȻF@o̒?4wx)]P~u57X 9^ܩU;Iꭆ 5 eK27({|Y׎ V\"Z1 Z}(Ǝ"1S_vE30>p; ΝD%xW?W?vo^Vidr[/&>~`9Why;R ;;ɮT?r$g1KACcKl:'3 cﳯ*"t8~l)m+U,z`(>yJ?h>]vЍG*{`;y]IT ;cNUfo¾h/$|NS1S"HVT4uhǜ]v;5͠x'C\SBplh}N ABx%ޭl/Twʽ]D=Kžr㻠l4SO?=k M: cCa#ha)ѐxcsgPiG{+xQI= zԫ+ 8"kñj=|c yCF/*9жh{ ?4o kmQNx;Y4膚aw?6>e]Qr:g,i"ԩA*M7qB?ӕFhV25r[7 Y }LR}*sg+xr2U=*'WSZDW]WǞ<叓{$9Ou4y90-1'*D`c^o?(9uݐ'PI& fJݮ:wSjfP1F:X H9dԯ˝[_54 }*;@ܨ ðynT?ןd#4rGͨH1|-#MrS3G3).᧏3vz֑r$G"`j 1tx0<ƆWh6y6,œGagAyb)hDß_mü gG;evݝnQ C-*oyaMI><]obD":GA-\%LT8c)+y76oQ#*{(F⽕y=rW\p۩cA^e6KʐcVf5$'->ՉN"F"UQ@fGb~#&M=8טJNu9D[̤so~ G9TtW^g5y$bY'سǴ=U-2 #MCt(i lj@Q 5̣i*OsxKf}\M{EV{υƇ);HIfeLȣr2>WIȂ6ik 5YOxȺ>Yf5'|H+98pjn.OyjY~iw'l;s2Y:'lgꥴ)o#'SaaKZ m}`169n"xI *+ }FP"l45'ZgE8?[X7(.Q-*ތL@̲v.5[=t\+CNܛ,gSQnH}*FG16&:t4ُ"Ạ$b |#rsaT ]ӽDP7ո0y)e$ٕvIh'QEAm*HRI=: 4牢) %_iNݧl] NtGHL ɱg<1V,J~ٹ"KQ 9HS9?@kr;we݁]I!{ @G["`J:n]{cAEVʆ#U96j#Ym\qe4hB7Cdv\MNgmAyQL4uLjj9#44tl^}LnR!t±]rh6ٍ>yҏNfU  Fm@8}/ujb9he:AyծwGpΧh5l}3p468)Udc;Us/֔YX1O2uqs`hwgr~{ RmhN؎*q 42*th>#E#HvOq}6e\,Wk#Xb>p}դ3T5†6[@Py*n|'f֧>lư΂̺SU'*qp_SM 'c6m ySʨ;MrƋmKxo,GmPAG:iw9}M(^V$ǒѽ9| aJSQarB;}ٻ֢2%Uc#gNaݕ'v[OY'3L3;,p]@S{lsX'cjwk'a.}}& dP*bK=ɍ!;3ngΊUߴmt'*{,=SzfD Ako~Gaoq_mi}#mPXhύmxǍ΂巿zfQc|kc?WY$_Lvl߶c`?ljݲˏ!V6UЂ(A4y)HpZ_x>eR$/`^'3qˏ-&Q=?CFVR DfV9{8gnh(P"6[D< E~0<@`G6Hгcc cK.5DdB`?XQ2ٿyqo&+1^ DW0ꊩG#QnL3c/x 11[yxპCWCcUĨ80me4.{muI=f0QRls9f9~fǨa"@8ȁQ#cicG$Gr/$W(WV"m7[mAmboD j۳ l^kh׽ # iXnveTka^Y4BNĕ0 !01@Q"2AaPq3BR?@4QT3,㺠W[=JKϞ2r^7vc:9 EߴwS#dIxu:Hp9E! V 2;73|F9Y*ʬFDu&y؟^EAA(ɩ^GV:ݜDy`Jr29ܾ㝉[E;FzxYGUeYC v-txIsםĘqEb+P\ :>iC';k|zرny]#ǿbQw(r|ӹs[D2v-%@;8<a[\o[ϧwI!*0krs)[J9^ʜp1) "/_>o<1AEy^C`x1'ܣnps`lfQ):lb>MejH^?kl3(z:1ŠK&?Q~{ٺhy/[V|6}KbXmn[-75q94dmc^h X5G-}دBޟ |rtMV+]c?-#ڛ^ǂ}LkrOu>-Dry D?:ޞUǜ7V?瓮"#rչģVR;n/_ ؉vݶe5db9/O009G5nWJpA*r9>1.[tsFnQ V 77R]ɫ8_0<՜IFu(v4Fk3E)N:yڮeP`1}$WSJSQNjٺ޵#lј(5=5lǏmoWv-1v,Wmn߀$x_DȬ0¤#QR[Vkzmw"9ZG7'[=Qj8R?zf\a=OU*oBA|G254 p.w7  &ξxGHp B%$gtЏ򤵍zHNuЯ-'40;_3 !01"@AQa2Pq#3BR?ʩcaen^8F<7;EA{EÖ1U/#d1an.1ě0ʾRh|RAo3m3 % 28Q yφHTo7lW>#i`qca m,B-j݋'mR1Ήt>Vps0IbIC.1Rea]H64B>o]($Bma!=?B KǾ+Ծ"nK*+[T#{EJSQs5:U\wĐf3܆&)IԆwE TlrTf6Q|Rh:[K zc֧GC%\_a84HcObiؖV7H )*ģK~Xhչ04?0 E<}3#u? |gS6ꊤ|I#Hڛ աwX97Ŀ%SLy6č|Fa 8b$sקhb9RAu7˨pČ_\*w묦F 4D~f|("mNKiS>$d7SlA/²SL|6N}S˯g]6; #. 403WebShell
403Webshell
Server IP : 13.127.148.211  /  Your IP : 216.73.216.13
Web Server : Apache/2.4.41 (Ubuntu)
System : Linux ip-172-31-43-195 5.15.0-1084-aws #91~20.04.1-Ubuntu SMP Fri May 2 06:59:36 UTC 2025 x86_64
User : www-data ( 33)
PHP Version : 7.4.3-4ubuntu2.29
Disable Function : pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,pcntl_unshare,
MySQL : OFF  |  cURL : ON  |  WGET : ON  |  Perl : ON  |  Python : OFF  |  Sudo : ON  |  Pkexec : ON
Directory :  /lib/python3/dist-packages/certbot/tests/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Command :


[ Back ]     

Current File : /lib/python3/dist-packages/certbot/tests/util.py
"""Test utilities.

.. warning:: This module is not part of the public API.

"""
import logging
import shutil
import sys
import tempfile
import unittest
from multiprocessing import Process, Event

import OpenSSL
import josepy as jose
import mock
import pkg_resources
import six
from six.moves import reload_module  # pylint: disable=import-error
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import serialization

from certbot import configuration
from certbot import constants
from certbot import interfaces
from certbot import lock
from certbot import storage
from certbot import util
from certbot.compat import os
from certbot.compat import filesystem
from certbot.display import util as display_util


def vector_path(*names):
    """Path to a test vector."""
    return pkg_resources.resource_filename(
        __name__, os.path.join('testdata', *names))


def load_vector(*names):
    """Load contents of a test vector."""
    # luckily, resource_string opens file in binary mode
    data = pkg_resources.resource_string(
        __name__, os.path.join('testdata', *names))
    # Try at most to convert CRLF to LF when data is text
    try:
        return data.decode().replace('\r\n', '\n').encode()
    except ValueError:
        # Failed to process the file with standard encoding.
        # Most likely not a text file, return its bytes untouched.
        return data


def _guess_loader(filename, loader_pem, loader_der):
    _, ext = os.path.splitext(filename)
    if ext.lower() == '.pem':
        return loader_pem
    elif ext.lower() == '.der':
        return loader_der
    else:  # pragma: no cover
        raise ValueError("Loader could not be recognized based on extension")


def load_cert(*names):
    """Load certificate."""
    loader = _guess_loader(
        names[-1], OpenSSL.crypto.FILETYPE_PEM, OpenSSL.crypto.FILETYPE_ASN1)
    return OpenSSL.crypto.load_certificate(loader, load_vector(*names))


def load_csr(*names):
    """Load certificate request."""
    loader = _guess_loader(
        names[-1], OpenSSL.crypto.FILETYPE_PEM, OpenSSL.crypto.FILETYPE_ASN1)
    return OpenSSL.crypto.load_certificate_request(loader, load_vector(*names))


def load_comparable_csr(*names):
    """Load ComparableX509 certificate request."""
    return jose.ComparableX509(load_csr(*names))


def load_rsa_private_key(*names):
    """Load RSA private key."""
    loader = _guess_loader(names[-1], serialization.load_pem_private_key,
                           serialization.load_der_private_key)
    return jose.ComparableRSAKey(loader(
        load_vector(*names), password=None, backend=default_backend()))


def load_pyopenssl_private_key(*names):
    """Load pyOpenSSL private key."""
    loader = _guess_loader(
        names[-1], OpenSSL.crypto.FILETYPE_PEM, OpenSSL.crypto.FILETYPE_ASN1)
    return OpenSSL.crypto.load_privatekey(loader, load_vector(*names))


def make_lineage(config_dir, testfile):
    """Creates a lineage defined by testfile.

    This creates the archive, live, and renewal directories if
    necessary and creates a simple lineage.

    :param str config_dir: path to the configuration directory
    :param str testfile: configuration file to base the lineage on

    :returns: path to the renewal conf file for the created lineage
    :rtype: str

    """
    lineage_name = testfile[:-len('.conf')]

    conf_dir = os.path.join(
        config_dir, constants.RENEWAL_CONFIGS_DIR)
    archive_dir = os.path.join(
        config_dir, constants.ARCHIVE_DIR, lineage_name)
    live_dir = os.path.join(
        config_dir, constants.LIVE_DIR, lineage_name)

    for directory in (archive_dir, conf_dir, live_dir,):
        if not os.path.exists(directory):
            filesystem.makedirs(directory)

    sample_archive = vector_path('sample-archive')
    for kind in os.listdir(sample_archive):
        shutil.copyfile(os.path.join(sample_archive, kind),
                        os.path.join(archive_dir, kind))

    for kind in storage.ALL_FOUR:
        os.symlink(os.path.join(archive_dir, '{0}1.pem'.format(kind)),
                   os.path.join(live_dir, '{0}.pem'.format(kind)))

    conf_path = os.path.join(config_dir, conf_dir, testfile)
    with open(vector_path(testfile)) as src:
        with open(conf_path, 'w') as dst:
            dst.writelines(
                line.replace('MAGICDIR', config_dir) for line in src)

    return conf_path


def patch_get_utility(target='zope.component.getUtility'):
    """Patch zope.component.getUtility to use a special mock IDisplay.

    The mock IDisplay works like a regular mock object, except it also
    also asserts that methods are called with valid arguments.

    :param str target: path to patch

    :returns: mock zope.component.getUtility
    :rtype: mock.MagicMock

    """
    return mock.patch(target, new_callable=_create_get_utility_mock)


def patch_get_utility_with_stdout(target='zope.component.getUtility',
                                  stdout=None):
    """Patch zope.component.getUtility to use a special mock IDisplay.

    The mock IDisplay works like a regular mock object, except it also
    also asserts that methods are called with valid arguments.

    The `message` argument passed to the IDisplay methods is passed to
    stdout's write method.

    :param str target: path to patch
    :param object stdout: object to write standard output to; it is
        expected to have a `write` method

    :returns: mock zope.component.getUtility
    :rtype: mock.MagicMock

    """
    stdout = stdout if stdout else six.StringIO()

    freezable_mock = _create_get_utility_mock_with_stdout(stdout)
    return mock.patch(target, new=freezable_mock)


class FreezableMock(object):
    """Mock object with the ability to freeze attributes.

    This class works like a regular mock.MagicMock object, except
    attributes and behavior set before the object is frozen cannot
    be changed during tests.

    If a func argument is provided to the constructor, this function
    is called first when an instance of FreezableMock is called,
    followed by the usual behavior defined by MagicMock. The return
    value of func is ignored.

    """
    def __init__(self, frozen=False, func=None, return_value=mock.sentinel.DEFAULT):
        self._frozen_set = set() if frozen else {'freeze', }
        self._func = func
        self._mock = mock.MagicMock()
        if return_value != mock.sentinel.DEFAULT:
            self.return_value = return_value
        self._frozen = frozen

    def freeze(self):
        """Freeze object preventing further changes."""
        self._frozen = True

    def __call__(self, *args, **kwargs):
        if self._func is not None:
            self._func(*args, **kwargs)
        return self._mock(*args, **kwargs)

    def __getattribute__(self, name):
        if name == '_frozen':
            try:
                return object.__getattribute__(self, name)
            except AttributeError:
                return False
        elif name in ('return_value', 'side_effect',):
            return getattr(object.__getattribute__(self, '_mock'), name)
        elif name == '_frozen_set' or name in self._frozen_set:
            return object.__getattribute__(self, name)
        else:
            return getattr(object.__getattribute__(self, '_mock'), name)

    def __setattr__(self, name, value):
        """ Before it is frozen, attributes are set on the FreezableMock
        instance and added to the _frozen_set. Attributes in the _frozen_set
        cannot be changed after the FreezableMock is frozen. In this case,
        they are set on the underlying _mock.

        In cases of return_value and side_effect, these attributes are always
        passed through to the instance's _mock and added to the _frozen_set
        before the object is frozen.

        """
        if self._frozen:
            if name in self._frozen_set:
                raise AttributeError('Cannot change frozen attribute ' + name)
            else:
                return setattr(self._mock, name, value)

        if name != '_frozen_set':
            self._frozen_set.add(name)

        if name in ('return_value', 'side_effect'):
            return setattr(self._mock, name, value)

        return object.__setattr__(self, name, value)


def _create_get_utility_mock():
    display = FreezableMock()
    # Use pylint code for disable to keep on single line under line length limit
    for name in interfaces.IDisplay.names():  # pylint: disable=no-member,E1120
        if name != 'notification':
            frozen_mock = FreezableMock(frozen=True, func=_assert_valid_call)
            setattr(display, name, frozen_mock)
    display.freeze()
    return FreezableMock(frozen=True, return_value=display)


def _create_get_utility_mock_with_stdout(stdout):
    def _write_msg(message, *unused_args, **unused_kwargs):
        """Write to message to stdout.
        """
        if message:
            stdout.write(message)

    def mock_method(*args, **kwargs):
        """
        Mock function for IDisplay methods.
        """
        _assert_valid_call(args, kwargs)
        _write_msg(*args, **kwargs)


    display = FreezableMock()
    # Use pylint code for disable to keep on single line under line length limit
    for name in interfaces.IDisplay.names():  # pylint: disable=no-member,E1120
        if name == 'notification':
            frozen_mock = FreezableMock(frozen=True,
                                        func=_write_msg)
            setattr(display, name, frozen_mock)
        else:
            frozen_mock = FreezableMock(frozen=True,
                                        func=mock_method)
            setattr(display, name, frozen_mock)
    display.freeze()

    return FreezableMock(frozen=True, return_value=display)


def _assert_valid_call(*args, **kwargs):
    assert_args = [args[0] if args else kwargs['message']]

    assert_kwargs = {}
    assert_kwargs['default'] = kwargs.get('default', None)
    assert_kwargs['cli_flag'] = kwargs.get('cli_flag', None)
    assert_kwargs['force_interactive'] = kwargs.get('force_interactive', False)

    display_util.assert_valid_call(*assert_args, **assert_kwargs)


class TempDirTestCase(unittest.TestCase):
    """Base test class which sets up and tears down a temporary directory"""

    def setUp(self):
        """Execute before test"""
        self.tempdir = tempfile.mkdtemp()

    def tearDown(self):
        """Execute after test"""
        # Cleanup opened resources after a test. This is usually done through atexit handlers in
        # Certbot, but during tests, atexit will not run registered functions before tearDown is
        # called and instead will run them right before the entire test process exits.
        # It is a problem on Windows, that does not accept to clean resources before closing them.
        logging.shutdown()
        # Remove logging handlers that have been closed so they won't be
        # accidentally used in future tests.
        logging.getLogger().handlers = []
        util._release_locks()  # pylint: disable=protected-access

        shutil.rmtree(self.tempdir)


class ConfigTestCase(TempDirTestCase):
    """Test class which sets up a NamespaceConfig object."""
    def setUp(self):
        super(ConfigTestCase, self).setUp()
        self.config = configuration.NamespaceConfig(
            mock.MagicMock(**constants.CLI_DEFAULTS)
        )
        self.config.verb = "certonly"
        self.config.config_dir = os.path.join(self.tempdir, 'config')
        self.config.work_dir = os.path.join(self.tempdir, 'work')
        self.config.logs_dir = os.path.join(self.tempdir, 'logs')
        self.config.cert_path = constants.CLI_DEFAULTS['auth_cert_path']
        self.config.fullchain_path = constants.CLI_DEFAULTS['auth_chain_path']
        self.config.chain_path = constants.CLI_DEFAULTS['auth_chain_path']
        self.config.server = "https://example.com"


def _handle_lock(event_in, event_out, path):
    """
    Acquire a file lock on given path, then wait to release it. This worker is coordinated
    using events to signal when the lock should be acquired and released.
    :param multiprocessing.Event event_in: event object to signal when to release the lock
    :param multiprocessing.Event event_out: event object to signal when the lock is acquired
    :param path: the path to lock
    """
    if os.path.isdir(path):
        my_lock = lock.lock_dir(path)
    else:
        my_lock = lock.LockFile(path)
    try:
        event_out.set()
        assert event_in.wait(timeout=20), 'Timeout while waiting to release the lock.'
    finally:
        my_lock.release()


def lock_and_call(callback, path_to_lock):
    """
    Grab a lock on path_to_lock from a foreign process then execute the callback.
    :param callable callback: object to call after acquiring the lock
    :param str path_to_lock: path to file or directory to lock
    """
    # Reload certbot.util module to reset internal _LOCKS dictionary.
    reload_module(util)

    emit_event = Event()
    receive_event = Event()
    process = Process(target=_handle_lock, args=(emit_event, receive_event, path_to_lock))
    process.start()

    # Wait confirmation that lock is acquired
    assert receive_event.wait(timeout=10), 'Timeout while waiting to acquire the lock.'
    # Execute the callback
    callback()
    # Trigger unlock from foreign process
    emit_event.set()

    # Wait for process termination
    process.join(timeout=10)
    assert process.exitcode == 0


def skip_on_windows(reason):
    """Decorator to skip permanently a test on Windows. A reason is required."""
    def wrapper(function):
        """Wrapped version"""
        return unittest.skipIf(sys.platform == 'win32', reason)(function)
    return wrapper


def temp_join(path):
    """
    Return the given path joined to the tempdir path for the current platform
    Eg.: 'cert' => /tmp/cert (Linux) or 'C:\\Users\\currentuser\\AppData\\Temp\\cert' (Windows)
    """
    return os.path.join(tempfile.gettempdir(), path)

Youez - 2016 - github.com/yon3zu
LinuXploit