JFIFXX    $.' ",#(7),01444'9=82<.342  2!!22222222222222222222222222222222222222222222222222"4 ,PG"Z_4˷kjزZ,F+_z,© zh6٨icfu#ډb_N?wQ5-~I8TK<5oIv-k_U_~bMdӜUHh?]EwQk{_}qFW7HTՑYF?_'ϔ_Ջt=||I 6έ"D/[k9Y8ds|\Ҿp6Ҵ].6znopM[mei$[soᘨ˸ nɜG-ĨUycP3.DBli;hjx7Z^NhN3u{:jx힞#M&jL P@_ P&o89@Sz6t7#Oߋ s}YfTlmrZ)'Nk۞pw\Tȯ?8`Oi{wﭹW[r Q4F׊3m&L=h3z~#\l :F,j@ ʱwQT8"kJO6֚l}R>ډK]y&p}b;N1mr$|7>e@BTM*-iHgD) Em|ؘbҗaҾt4oG*oCNrPQ@z,|?W[0:n,jWiEW$~/hp\?{(0+Y8rΟ+>S-SVN;}s?. w9˟<Mq4Wv'{)01mBVW[8/< %wT^5b)iM pgN&ݝVO~qu9 !J27$O-! :%H ـyΠM=t{!S oK8txA& j0 vF Y|y ~6@c1vOpIg4lODL Rcj_uX63?nkWyf;^*B @~a`Eu+6L.ü>}y}_O6͐:YrGXkGl^w~㒶syIu! W XN7BVO!X2wvGRfT#t/?%8^WaTGcLMI(J1~8?aT ]ASE(*E} 2#I/׍qz^t̔bYz4xt){ OH+(EA&NXTo"XC')}Jzp ~5}^+6wcQ|LpdH}(.|kc4^"Z?ȕ a<L!039C EuCFEwç ;n?*oB8bʝ'#RqfM}7]s2tcS{\icTx;\7KPʇ Z O-~c>"?PEO8@8GQgaՎ󁶠䧘_%#r>1zaebqcPѵn#L =׀t L7`VA{C:ge@w1 Xp3c3ġpM"'-@n4fGB3DJ8[JoߐgK)ƛ$ 83+ 6ʻ SkI*KZlT _`?KQKdB`s}>`*>,*@JdoF*弝O}ks]yߘc1GV<=776qPTtXԀ!9*44Tހ3XΛex46YD  BdemDa\_l,G/֌7Y](xTt^%GE4}bTڹ;Y)BQu>J/J ⮶.XԄjݳ+Ed r5_D1 o Bx΢#<W8R6@gM. drD>(otU@x=~v2 ӣdoBd3eO6㣷ݜ66YQz`S{\P~z m5{J/L1xO\ZFu>ck#&:`$ai>2ΔloF[hlEܺΠk:)` $[69kOw\|8}ބ:񶐕IA1/=2[,!.}gN#ub ~݊}34qdELc$"[qU硬g^%B zrpJru%v\h1Yne`ǥ:gpQM~^Xi `S:V29.PV?Bk AEvw%_9CQwKekPؠ\;Io d{ ߞoc1eP\ `E=@KIRYK2NPlLɀ)&eB+ь( JTx_?EZ }@ 6U뙢طzdWIn` D噥[uV"G&Ú2g}&m?ċ"Om# {ON"SXNeysQ@FnVgdX~nj]J58up~.`r\O,ư0oS _Ml4kv\JSdxSW<AeIX$Iw:Sy›R9Q[,5;@]%u@ *rolbI  +%m:͇ZVủθau,RW33 dJeTYE.Mϧ-oj3+yy^cVO9NV\nd1 !͕_)av;թMlWR1)ElP;yوÏu 3k5Pr6<⒲l!˞*u־n!l:UNW %Chx8vL'X@*)̮ˍ D-M+JUkvK+x8cY?Ԡ~3mo|u@[XeYC\Kpx8oCC&N~3-H MXsu<`~"WL$8ξ3a)|:@m\^`@ҷ)5p+6p%i)P Mngc#0AruzRL+xSS?ʮ}()#tmˇ!0}}y$6Lt;$ʳ{^6{v6ķܰgVcnn ~zx«,2u?cE+ȘH؎%Za)X>uWTzNyosFQƤ$*&LLXL)1" LeOɟ9=:tZcŽY?ӭVwv~,Yrۗ|yGaFC.+ v1fήJ]STBn5sW}y$~z'c 8  ,! pVNSNNqy8z˱A4*'2n<s^ǧ˭PJޮɏUGLJ*#i}K%,)[z21z ?Nin1?TIR#m-1lA`fT5+ܐcq՝ʐ,3f2Uեmab#ŠdQy>\)SLYw#.ʑf ,"+w~N'cO3FN<)j&,- љ֊_zSTǦw>?nU仆Ve0$CdrP m׈eXmVu L.bֹ [Դaզ*\y8Է:Ez\0KqC b̘cөQ=0YsNS.3.Oo:#v7[#߫ 5܎LEr49nCOWlG^0k%;YߝZǓ:S#|}y,/kLd TA(AI$+I3;Y*Z}|ӧOdv..#:nf>>ȶITX 8y"dR|)0=n46ⲑ+ra ~]R̲c?6(q;5% |uj~z8R=XIV=|{vGj\gcqz؋%Mߍ1y#@f^^>N#x#۹6Y~?dfPO{P4Vu1E1J *|%JN`eWuzk M6q t[ gGvWIGu_ft5j"Y:Tɐ*; e54q$C2d} _SL#mYpO.C;cHi#֩%+) ӍƲVSYźg |tj38r|V1#;.SQA[S#`n+$$I P\[@s(EDzP])8G#0B[ىXIIq<9~[Z멜Z⊔IWU&A>P~#dp]9 "cP Md?٥Ifتuk/F9c*9Ǎ:ØFzn*@|Iށ9N3{'['ͬҲ4#}!V Fu,,mTIkv C7vB6kT91*l '~ƞFlU'M ][ΩũJ_{iIn$L jOdxkza۪#EClx˘oVɞljr)/,߬hL#^Lф,íMƁe̩NBLiLq}(q6IçJ$WE$:=#(KBzђ xlx?>Պ+>W,Ly!_DŌlQ![ SJ1ƐY}b,+Loxɓ)=yoh@꥟/Iѭ=Py9 ۍYӘe+pJnϱ?V\SO%(t =?MR[Șd/ nlB7j !;ӥ/[-A>dNsLj ,ɪv=1c.SQO3UƀܽE̻9GϷD7(}Ävӌ\y_0[w <΍>a_[0+LF.޺f>oNTq;y\bՃyjH<|q-eɏ_?_9+PHp$[uxK wMwNی'$Y2=qKBP~Yul:[<F12O5=d]Ysw:ϮEj,_QXz`H1,#II dwrP˂@ZJVy$\y{}^~[:NߌUOdؾe${p>G3cĖlʌ ת[`ϱ-WdgIig2 }s ؤ(%#sS@~3XnRG~\jc3vӍLM[JBTs3}jNʖW;7ç?=XF=-=qߚ#='c7ڑWI(O+=:uxqe2zi+kuGR0&eniT^J~\jyp'dtGsO39* b#Ɋ p[BwsT>d4ۧsnvnU_~,vƜJ1s QIz)(lv8MU=;56Gs#KMP=LvyGd}VwWBF'à ?MHUg2 !p7Qjڴ=ju JnA suMeƆҔ!)'8Ϣٔޝ(Vpצ֖d=ICJǠ{qkԭ߸i@Ku|p=..*+xz[Aqġ#s2aƊRR)*HRsi~a &fMP-KL@ZXy'x{}Zm+:)) IJ-iu ܒH'L(7yGӜq j 6ߌg1go,kرtY?W,pefOQS!K۟cҒA|սj>=⬒˧L[ ߿2JaB~Ru:Q] 0H~]7ƼI(}cq 'ήETq?fabӥvr )o-Q_'ᴎoK;Vo%~OK *bf:-ťIR`B5!RB@ï u ̯e\_U_ gES3QTaxU<~c?*#]MW,[8Oax]1bC|踤Plw5V%){t<d50iXSUm:Z┵i"1^B-PhJ&)O*DcWvM)}Pܗ-q\mmζZ-l@}aE6F@&Sg@ݚM ȹ 4#p\HdYDoH"\..RBHz_/5˘6KhJRPmƶim3,#ccoqa)*PtRmk7xDE\Y閣_X<~)c[[BP6YqS0%_;Àv~| VS؇ 'O0F0\U-d@7SJ*z3nyPOm~P3|Yʉr#CSN@ ƮRN)r"C:: #qbY. 6[2K2uǦHYRQMV G$Q+.>nNHq^ qmMVD+-#*U̒ p욳u:IBmPV@Or[b= 1UE_NmyKbNOU}the`|6֮P>\2PVIDiPO;9rmAHGWS]J*_G+kP2KaZH'KxWMZ%OYDRc+o?qGhmdSoh\D|:WUAQc yTq~^H/#pCZTI1ӏT4"ČZ}`w#*,ʹ 0i課Om*da^gJ݅{le9uF#Tֲ̲ٞC"qߍ ոޑo#XZTp@ o8(jdxw],f`~|,s^f1t|m򸄭/ctr5s79Q4H1꠲BB@l9@C+wpxu£Yc9?`@#omHs2)=2.ljg9$YS%*LRY7Z,*=䷘$armoϰUW.|rufIGwtZwo~5 YյhO+=8fF)W7L9lM̘·Y֘YLf큹pRF99.A "wz=E\Z'a 2Ǚ#;'}G*l^"q+2FQ hjkŦ${ޮ-T٭cf|3#~RJt$b(R(rdx >U b&9,>%E\ Άe$'q't*אެb-|dSBOO$R+H)܎K1m`;J2Y~9Og8=vqD`K[F)k[1m޼cn]skz$@)!I x՝"v9=ZA=`Ɠi :E)`7vI}dYI_ o:obo 3Q&D&2= Ά;>hy.*ⅥSӬ+q&j|UƧ}J0WW< ۋS)jQRjƯrN)Gű4Ѷ(S)Ǣ8iW52No˓ ۍ%5brOnL;n\G=^UdI8$&h'+(cȁ߫klS^cƗjԌEꭔgFȒ@}O*;evWVYJ\]X'5ղkFb 6Ro՜mi Ni>J?lPmU}>_Z&KKqrIDՉ~q3fL:Se>E-G{L6pe,8QIhaXaUA'ʂs+טIjP-y8ۈZ?J$WP Rs]|l(ԓsƊio(S0Y 8T97.WiLc~dxcE|2!XKƘਫ਼$((6~|d9u+qd^389Y6L.I?iIq9)O/뚅OXXVZF[یgQLK1RҖr@v#XlFНyS87kF!AsM^rkpjPDyS$Nqnxҍ!Uf!ehi2m`YI9r6 TFC}/y^Η5d'9A-J>{_l+`A['յϛ#w:݅%X}&PStQ"-\縵/$ƗhXb*yBS;Wջ_mcvt?2}1;qSdd~u:2k52R~z+|HE!)Ǟl7`0<,2*Hl-x^'_TVgZA'j ^2ΪN7t?w x1fIzC-ȖK^q;-WDvT78Z hK(P:Q- 8nZ܃e貾<1YT<,"6{/ ?͟|1:#gW>$dJdB=jf[%rE^il:BxSּ1հ,=*7 fcG#q eh?27,!7x6nLC4x},GeǝtC.vS F43zz\;QYC,6~;RYS/6|25vTimlv& nRh^ejRLGf? ۉҬܦƩ|Ȱ>3!viʯ>vオX3e_1zKȗ\qHS,EW[㺨uch⍸O}a>q6n6N6qN ! 1AQaq0@"2BRb#Pr3C`Scst$4D%Td ?Na3mCwxAmqmm$4n淿t'C"wzU=D\R+wp+YT&պ@ƃ3ޯ?AﶂaŘ@-Q=9Dռѻ@MVP܅G5fY6# ?0UQ,IX(6ڵ[DIMNލc&υj\XR|,4 jThAe^db#$]wOӪ1y%LYm뭛CUƃߜ}Cy1XνmF8jI]HۺиE@Ii;r8ӭVFՇ| &?3|xBMuSGe=Ӕ#BE5GY!z_eqр/W>|-Ci߇t1ޯќdR3ug=0 5[?#͏qcfH{ ?u=??ǯ}ZzhmΔBFTWPxs}G93 )gGR<>r h$'nchPBjJҧH -N1N?~}-q!=_2hcMlvY%UE@|vM2.Y[|y"EïKZF,ɯ?,q?vM 80jx";9vk+ ֧ ȺU?%vcVmA6Qg^MA}3nl QRNl8kkn'(M7m9وq%ޟ*h$Zk"$9: ?U8Sl,,|ɒxH(ѷGn/Q4PG%Ա8N! &7;eKM749R/%lc>x;>C:th?aKXbheᜋ^$Iհ hr7%F$EFdt5+(M6tÜUU|zW=aTsTgdqPQb'm1{|YXNb P~F^F:k6"j! Ir`1&-$Bevk:y#ywI0x=D4tUPZHڠ底taP6b>xaQ# WeFŮNjpJ* mQN*I-*ȩFg3 5Vʊɮa5FO@{NX?H]31Ri_uѕ 0 F~:60p͈SqX#a5>`o&+<2D: ڝ$nP*)N|yEjF5ټeihyZ >kbHavh-#!Po=@k̆IEN@}Ll?jO߭ʞQ|A07xwt!xfI2?Z<ץTcUj]陎Ltl }5ϓ$,Omˊ;@OjEj(ا,LXLOЦ90O .anA7j4 W_ٓzWjcBy՗+EM)dNg6y1_xp$Lv:9"zpʙ$^JԼ*ϭo=xLj6Ju82AH3$ٕ@=Vv]'qEz;I˼)=ɯx /W(Vp$ mu񶤑OqˎTr㠚xsrGCbypG1ߠw e8$⿄/M{*}W]˷.CK\ުx/$WPwr |i&}{X >$-l?-zglΆ(FhvS*b߲ڡn,|)mrH[a3ר[13o_U3TC$(=)0kgP u^=4 WYCҸ:vQרXàtkm,t*^,}D* "(I9R>``[~Q]#afi6l86:,ssN6j"A4IuQ6E,GnHzSHOuk5$I4ؤQ9@CwpBGv[]uOv0I4\yQѸ~>Z8Taqޣ;za/SI:ܫ_|>=Z8:SUIJ"IY8%b8H:QO6;7ISJҌAά3>cE+&jf$eC+z;V rʺmyeaQf&6ND.:NTvm<- uǝ\MvZYNNT-A>jr!SnO 13Ns%3D@`ܟ 1^c< aɽ̲Xë#w|ycW=9I*H8p^(4՗karOcWtO\ƍR8'KIQ?5>[}yUײ -h=% qThG2)"ו3]!kB*pFDlA,eEiHfPs5H:Փ~H0DتDIhF3c2E9H5zԑʚiX=:mxghd(v׊9iSOd@0ڽ:p5h-t&Xqӕ,ie|7A2O%PEhtjY1wЃ!  ࢽMy7\a@ţJ 4ȻF@o̒?4wx)]P~u57X 9^ܩU;Iꭆ 5 eK27({|Y׎ V\"Z1 Z}(Ǝ"1S_vE30>p; ΝD%xW?W?vo^Vidr[/&>~`9Why;R ;;ɮT?r$g1KACcKl:'3 cﳯ*"t8~l)m+U,z`(>yJ?h>]vЍG*{`;y]IT ;cNUfo¾h/$|NS1S"HVT4uhǜ]v;5͠x'C\SBplh}N ABx%ޭl/Twʽ]D=Kžr㻠l4SO?=k M: cCa#ha)ѐxcsgPiG{+xQI= zԫ+ 8"kñj=|c yCF/*9жh{ ?4o kmQNx;Y4膚aw?6>e]Qr:g,i"ԩA*M7qB?ӕFhV25r[7 Y }LR}*sg+xr2U=*'WSZDW]WǞ<叓{$9Ou4y90-1'*D`c^o?(9uݐ'PI& fJݮ:wSjfP1F:X H9dԯ˝[_54 }*;@ܨ ðynT?ןd#4rGͨH1|-#MrS3G3).᧏3vz֑r$G"`j 1tx0<ƆWh6y6,œGagAyb)hDß_mü gG;evݝnQ C-*oyaMI><]obD":GA-\%LT8c)+y76oQ#*{(F⽕y=rW\p۩cA^e6KʐcVf5$'->ՉN"F"UQ@fGb~#&M=8טJNu9D[̤so~ G9TtW^g5y$bY'سǴ=U-2 #MCt(i lj@Q 5̣i*OsxKf}\M{EV{υƇ);HIfeLȣr2>WIȂ6ik 5YOxȺ>Yf5'|H+98pjn.OyjY~iw'l;s2Y:'lgꥴ)o#'SaaKZ m}`169n"xI *+ }FP"l45'ZgE8?[X7(.Q-*ތL@̲v.5[=t\+CNܛ,gSQnH}*FG16&:t4ُ"Ạ$b |#rsaT ]ӽDP7ո0y)e$ٕvIh'QEAm*HRI=: 4牢) %_iNݧl] NtGHL ɱg<1V,J~ٹ"KQ 9HS9?@kr;we݁]I!{ @G["`J:n]{cAEVʆ#U96j#Ym\qe4hB7Cdv\MNgmAyQL4uLjj9#44tl^}LnR!t±]rh6ٍ>yҏNfU  Fm@8}/ujb9he:AyծwGpΧh5l}3p468)Udc;Us/֔YX1O2uqs`hwgr~{ RmhN؎*q 42*th>#E#HvOq}6e\,Wk#Xb>p}դ3T5†6[@Py*n|'f֧>lư΂̺SU'*qp_SM 'c6m ySʨ;MrƋmKxo,GmPAG:iw9}M(^V$ǒѽ9| aJSQarB;}ٻ֢2%Uc#gNaݕ'v[OY'3L3;,p]@S{lsX'cjwk'a.}}& dP*bK=ɍ!;3ngΊUߴmt'*{,=SzfD Ako~Gaoq_mi}#mPXhύmxǍ΂巿zfQc|kc?WY$_Lvl߶c`?ljݲˏ!V6UЂ(A4y)HpZ_x>eR$/`^'3qˏ-&Q=?CFVR DfV9{8gnh(P"6[D< E~0<@`G6Hгcc cK.5DdB`?XQ2ٿyqo&+1^ DW0ꊩG#QnL3c/x 11[yxპCWCcUĨ80me4.{muI=f0QRls9f9~fǨa"@8ȁQ#cicG$Gr/$W(WV"m7[mAmboD j۳ l^kh׽ # iXnveTka^Y4BNĕ0 !01@Q"2AaPq3BR?@4QT3,㺠W[=JKϞ2r^7vc:9 EߴwS#dIxu:Hp9E! V 2;73|F9Y*ʬFDu&y؟^EAA(ɩ^GV:ݜDy`Jr29ܾ㝉[E;FzxYGUeYC v-txIsםĘqEb+P\ :>iC';k|zرny]#ǿbQw(r|ӹs[D2v-%@;8<a[\o[ϧwI!*0krs)[J9^ʜp1) "/_>o<1AEy^C`x1'ܣnps`lfQ):lb>MejH^?kl3(z:1ŠK&?Q~{ٺhy/[V|6}KbXmn[-75q94dmc^h X5G-}دBޟ |rtMV+]c?-#ڛ^ǂ}LkrOu>-Dry D?:ޞUǜ7V?瓮"#rչģVR;n/_ ؉vݶe5db9/O009G5nWJpA*r9>1.[tsFnQ V 77R]ɫ8_0<՜IFu(v4Fk3E)N:yڮeP`1}$WSJSQNjٺ޵#lј(5=5lǏmoWv-1v,Wmn߀$x_DȬ0¤#QR[Vkzmw"9ZG7'[=Qj8R?zf\a=OU*oBA|G254 p.w7  &ξxGHp B%$gtЏ򤵍zHNuЯ-'40;_3 !01"@AQa2Pq#3BR?ʩcaen^8F<7;EA{EÖ1U/#d1an.1ě0ʾRh|RAo3m3 % 28Q yφHTo7lW>#i`qca m,B-j݋'mR1Ήt>Vps0IbIC.1Rea]H64B>o]($Bma!=?B KǾ+Ծ"nK*+[T#{EJSQs5:U\wĐf3܆&)IԆwE TlrTf6Q|Rh:[K zc֧GC%\_a84HcObiؖV7H )*ģK~Xhչ04?0 E<}3#u? |gS6ꊤ|I#Hڛ աwX97Ŀ%SLy6č|Fa 8b$sקhb9RAu7˨pČ_\*w묦F 4D~f|("mNKiS>$d7SlA/²SL|6N}S˯g]6; #. 403WebShell
403Webshell
Server IP : 13.127.148.211  /  Your IP : 216.73.216.149
Web Server : Apache/2.4.41 (Ubuntu)
System : Linux ip-172-31-43-195 5.15.0-1084-aws #91~20.04.1-Ubuntu SMP Fri May 2 06:59:36 UTC 2025 x86_64
User : www-data ( 33)
PHP Version : 7.4.3-4ubuntu2.29
Disable Function : pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,pcntl_unshare,
MySQL : OFF  |  cURL : ON  |  WGET : ON  |  Perl : ON  |  Python : OFF  |  Sudo : ON  |  Pkexec : ON
Directory :  /usr/lib/python3/dist-packages/twisted/test/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Command :


[ Back ]     

Current File : /usr/lib/python3/dist-packages/twisted/test//test_adbapi.py
# Copyright (c) Twisted Matrix Laboratories.
# See LICENSE for details.

"""
Tests for twisted.enterprise.adbapi.
"""

from twisted.trial import unittest

import os
import stat

from twisted.enterprise.adbapi import ConnectionPool, ConnectionLost
from twisted.enterprise.adbapi import Connection, Transaction
from twisted.internet import reactor, defer, interfaces
from twisted.python.failure import Failure
from twisted.python.reflect import requireModule

simple_table_schema = """
CREATE TABLE simple (
  x integer
)
"""



class ADBAPITestBase(object):
    """
    Test the asynchronous DB-API code.
    """
    openfun_called = {}

    if interfaces.IReactorThreads(reactor, None) is None:
        skip = "ADB-API requires threads, no way to test without them"


    def extraSetUp(self):
        """
        Set up the database and create a connection pool pointing at it.
        """
        self.startDB()
        self.dbpool = self.makePool(cp_openfun=self.openfun)
        self.dbpool.start()


    def tearDown(self):
        d =  self.dbpool.runOperation('DROP TABLE simple')
        d.addCallback(lambda res: self.dbpool.close())
        d.addCallback(lambda res: self.stopDB())
        return d


    def openfun(self, conn):
        self.openfun_called[conn] = True


    def checkOpenfunCalled(self, conn=None):
        if not conn:
            self.assertTrue(self.openfun_called)
        else:
            self.assertIn(conn, self.openfun_called)


    def test_pool(self):
        d = self.dbpool.runOperation(simple_table_schema)
        if self.test_failures:
            d.addCallback(self._testPool_1_1)
            d.addCallback(self._testPool_1_2)
            d.addCallback(self._testPool_1_3)
            d.addCallback(self._testPool_1_4)
            d.addCallback(lambda res: self.flushLoggedErrors())
        d.addCallback(self._testPool_2)
        d.addCallback(self._testPool_3)
        d.addCallback(self._testPool_4)
        d.addCallback(self._testPool_5)
        d.addCallback(self._testPool_6)
        d.addCallback(self._testPool_7)
        d.addCallback(self._testPool_8)
        d.addCallback(self._testPool_9)
        return d


    def _testPool_1_1(self, res):
        d = defer.maybeDeferred(self.dbpool.runQuery, "select * from NOTABLE")
        d.addCallbacks(lambda res: self.fail('no exception'),
                       lambda f: None)
        return d


    def _testPool_1_2(self, res):
        d = defer.maybeDeferred(self.dbpool.runOperation,
                                "deletexxx from NOTABLE")
        d.addCallbacks(lambda res: self.fail('no exception'),
                       lambda f: None)
        return d


    def _testPool_1_3(self, res):
        d = defer.maybeDeferred(self.dbpool.runInteraction,
                                self.bad_interaction)
        d.addCallbacks(lambda res: self.fail('no exception'),
                       lambda f: None)
        return d


    def _testPool_1_4(self, res):
        d = defer.maybeDeferred(self.dbpool.runWithConnection,
                                self.bad_withConnection)
        d.addCallbacks(lambda res: self.fail('no exception'),
                       lambda f: None)
        return d


    def _testPool_2(self, res):
        # verify simple table is empty
        sql = "select count(1) from simple"
        d = self.dbpool.runQuery(sql)
        def _check(row):
            self.assertTrue(int(row[0][0]) == 0, "Interaction not rolled back")
            self.checkOpenfunCalled()
        d.addCallback(_check)
        return d


    def _testPool_3(self, res):
        sql = "select count(1) from simple"
        inserts = []
        # add some rows to simple table (runOperation)
        for i in range(self.num_iterations):
            sql = "insert into simple(x) values(%d)" % i
            inserts.append(self.dbpool.runOperation(sql))
        d = defer.gatherResults(inserts)

        def _select(res):
            # make sure they were added (runQuery)
            sql = "select x from simple order by x";
            d = self.dbpool.runQuery(sql)
            return d
        d.addCallback(_select)

        def _check(rows):
            self.assertTrue(len(rows) == self.num_iterations,
                            "Wrong number of rows")
            for i in range(self.num_iterations):
                self.assertTrue(len(rows[i]) == 1, "Wrong size row")
                self.assertTrue(rows[i][0] == i, "Values not returned.")
        d.addCallback(_check)

        return d


    def _testPool_4(self, res):
        # runInteraction
        d = self.dbpool.runInteraction(self.interaction)
        d.addCallback(lambda res: self.assertEqual(res, "done"))
        return d


    def _testPool_5(self, res):
        # withConnection
        d = self.dbpool.runWithConnection(self.withConnection)
        d.addCallback(lambda res: self.assertEqual(res, "done"))
        return d


    def _testPool_6(self, res):
        # Test a withConnection cannot be closed
        d = self.dbpool.runWithConnection(self.close_withConnection)
        return d


    def _testPool_7(self, res):
        # give the pool a workout
        ds = []
        for i in range(self.num_iterations):
            sql = "select x from simple where x = %d" % i
            ds.append(self.dbpool.runQuery(sql))
        dlist = defer.DeferredList(ds, fireOnOneErrback=True)
        def _check(result):
            for i in range(self.num_iterations):
                self.assertTrue(result[i][1][0][0] == i, "Value not returned")
        dlist.addCallback(_check)
        return dlist


    def _testPool_8(self, res):
        # now delete everything
        ds = []
        for i in range(self.num_iterations):
            sql = "delete from simple where x = %d" % i
            ds.append(self.dbpool.runOperation(sql))
        dlist = defer.DeferredList(ds, fireOnOneErrback=True)
        return dlist


    def _testPool_9(self, res):
        # verify simple table is empty
        sql = "select count(1) from simple"
        d = self.dbpool.runQuery(sql)
        def _check(row):
            self.assertTrue(int(row[0][0]) == 0,
                            "Didn't successfully delete table contents")
            self.checkConnect()
        d.addCallback(_check)
        return d


    def checkConnect(self):
        """Check the connect/disconnect synchronous calls."""
        conn = self.dbpool.connect()
        self.checkOpenfunCalled(conn)
        curs = conn.cursor()
        curs.execute("insert into simple(x) values(1)")
        curs.execute("select x from simple")
        res = curs.fetchall()
        self.assertEqual(len(res), 1)
        self.assertEqual(len(res[0]), 1)
        self.assertEqual(res[0][0], 1)
        curs.execute("delete from simple")
        curs.execute("select x from simple")
        self.assertEqual(len(curs.fetchall()), 0)
        curs.close()
        self.dbpool.disconnect(conn)


    def interaction(self, transaction):
        transaction.execute("select x from simple order by x")
        for i in range(self.num_iterations):
            row = transaction.fetchone()
            self.assertTrue(len(row) == 1, "Wrong size row")
            self.assertTrue(row[0] == i, "Value not returned.")
        self.assertIsNone(transaction.fetchone(), "Too many rows")
        return "done"


    def bad_interaction(self, transaction):
        if self.can_rollback:
            transaction.execute("insert into simple(x) values(0)")

        transaction.execute("select * from NOTABLE")


    def withConnection(self, conn):
        curs = conn.cursor()
        try:
            curs.execute("select x from simple order by x")
            for i in range(self.num_iterations):
                row = curs.fetchone()
                self.assertTrue(len(row) == 1, "Wrong size row")
                self.assertTrue(row[0] == i, "Value not returned.")
        finally:
            curs.close()
        return "done"


    def close_withConnection(self, conn):
        conn.close()


    def bad_withConnection(self, conn):
        curs = conn.cursor()
        try:
            curs.execute("select * from NOTABLE")
        finally:
            curs.close()



class ReconnectTestBase(object):
    """
    Test the asynchronous DB-API code with reconnect.
    """

    if interfaces.IReactorThreads(reactor, None) is None:
        skip = "ADB-API requires threads, no way to test without them"


    def extraSetUp(self):
        """
        Skip the test if C{good_sql} is unavailable.  Otherwise, set up the
        database, create a connection pool pointed at it, and set up a simple
        schema in it.
        """
        if self.good_sql is None:
            raise unittest.SkipTest('no good sql for reconnect test')
        self.startDB()
        self.dbpool = self.makePool(cp_max=1, cp_reconnect=True,
                                    cp_good_sql=self.good_sql)
        self.dbpool.start()
        return self.dbpool.runOperation(simple_table_schema)


    def tearDown(self):
        d = self.dbpool.runOperation('DROP TABLE simple')
        d.addCallback(lambda res: self.dbpool.close())
        d.addCallback(lambda res: self.stopDB())
        return d


    def test_pool(self):
        d = defer.succeed(None)
        d.addCallback(self._testPool_1)
        d.addCallback(self._testPool_2)
        if not self.early_reconnect:
            d.addCallback(self._testPool_3)
        d.addCallback(self._testPool_4)
        d.addCallback(self._testPool_5)
        return d


    def _testPool_1(self, res):
        sql = "select count(1) from simple"
        d = self.dbpool.runQuery(sql)
        def _check(row):
            self.assertTrue(int(row[0][0]) == 0, "Table not empty")
        d.addCallback(_check)
        return d


    def _testPool_2(self, res):
        # reach in and close the connection manually
        list(self.dbpool.connections.values())[0].close()


    def _testPool_3(self, res):
        sql = "select count(1) from simple"
        d = defer.maybeDeferred(self.dbpool.runQuery, sql)
        d.addCallbacks(lambda res: self.fail('no exception'),
                       lambda f: None)
        return d


    def _testPool_4(self, res):
        sql = "select count(1) from simple"
        d = self.dbpool.runQuery(sql)
        def _check(row):
            self.assertTrue(int(row[0][0]) == 0, "Table not empty")
        d.addCallback(_check)
        return d


    def _testPool_5(self, res):
        self.flushLoggedErrors()
        sql = "select * from NOTABLE" # bad sql
        d = defer.maybeDeferred(self.dbpool.runQuery, sql)
        d.addCallbacks(lambda res: self.fail('no exception'),
                       lambda f: self.assertFalse(f.check(ConnectionLost)))
        return d



class DBTestConnector(object):
    """
    A class which knows how to test for the presence of
    and establish a connection to a relational database.

    To enable test cases  which use a central, system database,
    you must create a database named DB_NAME with a user DB_USER
    and password DB_PASS with full access rights to database DB_NAME.
    """
    TEST_PREFIX = None # used for creating new test cases

    DB_NAME = "twisted_test"
    DB_USER = 'twisted_test'
    DB_PASS = 'twisted_test'

    DB_DIR = None # directory for database storage

    nulls_ok = True # nulls supported
    trailing_spaces_ok = True # trailing spaces in strings preserved
    can_rollback = True # rollback supported
    test_failures = True # test bad sql?
    escape_slashes = True # escape \ in sql?
    good_sql = ConnectionPool.good_sql
    early_reconnect = True # cursor() will fail on closed connection
    can_clear = True # can try to clear out tables when starting

    num_iterations = 50 # number of iterations for test loops
                        # (lower this for slow db's)

    def setUp(self):
        self.DB_DIR = self.mktemp()
        os.mkdir(self.DB_DIR)
        if not self.can_connect():
            raise unittest.SkipTest('%s: Cannot access db' % self.TEST_PREFIX)
        return self.extraSetUp()


    def can_connect(self):
        """Return true if this database is present on the system
        and can be used in a test."""
        raise NotImplementedError()


    def startDB(self):
        """Take any steps needed to bring database up."""
        pass


    def stopDB(self):
        """Bring database down, if needed."""
        pass


    def makePool(self, **newkw):
        """Create a connection pool with additional keyword arguments."""
        args, kw = self.getPoolArgs()
        kw = kw.copy()
        kw.update(newkw)
        return ConnectionPool(*args, **kw)


    def getPoolArgs(self):
        """Return a tuple (args, kw) of list and keyword arguments
        that need to be passed to ConnectionPool to create a connection
        to this database."""
        raise NotImplementedError()



class SQLite3Connector(DBTestConnector):
    """
    Connector that uses the stdlib SQLite3 database support.
    """
    TEST_PREFIX = 'SQLite3'
    escape_slashes = False
    num_iterations = 1 # slow

    def can_connect(self):
        if requireModule('sqlite3') is None:
            return False
        else:
            return True


    def startDB(self):
        self.database = os.path.join(self.DB_DIR, self.DB_NAME)
        if os.path.exists(self.database):
            os.unlink(self.database)


    def getPoolArgs(self):
        args = ('sqlite3',)
        kw = {'database': self.database,
              'cp_max': 1,
              'check_same_thread': False}
        return args, kw



class PySQLite2Connector(DBTestConnector):
    """
    Connector that uses pysqlite's SQLite database support.
    """
    TEST_PREFIX = 'pysqlite2'
    escape_slashes = False
    num_iterations = 1 # slow

    def can_connect(self):
        if requireModule('pysqlite2.dbapi2') is None:
            return False
        else:
            return True


    def startDB(self):
        self.database = os.path.join(self.DB_DIR, self.DB_NAME)
        if os.path.exists(self.database):
            os.unlink(self.database)


    def getPoolArgs(self):
        args = ('pysqlite2.dbapi2',)
        kw = {'database': self.database,
              'cp_max': 1,
              'check_same_thread': False}
        return args, kw



class PyPgSQLConnector(DBTestConnector):
    TEST_PREFIX = "PyPgSQL"

    def can_connect(self):
        try: from pyPgSQL import PgSQL
        except: return False
        try:
            conn = PgSQL.connect(database=self.DB_NAME, user=self.DB_USER,
                                 password=self.DB_PASS)
            conn.close()
            return True
        except:
            return False


    def getPoolArgs(self):
        args = ('pyPgSQL.PgSQL',)
        kw = {'database': self.DB_NAME, 'user': self.DB_USER,
              'password': self.DB_PASS, 'cp_min': 0}
        return args, kw



class PsycopgConnector(DBTestConnector):
    TEST_PREFIX = 'Psycopg'

    def can_connect(self):
        try: import psycopg
        except: return False
        try:
            conn = psycopg.connect(database=self.DB_NAME, user=self.DB_USER,
                                   password=self.DB_PASS)
            conn.close()
            return True
        except:
            return False


    def getPoolArgs(self):
        args = ('psycopg',)
        kw = {'database': self.DB_NAME, 'user': self.DB_USER,
              'password': self.DB_PASS, 'cp_min': 0}
        return args, kw



class MySQLConnector(DBTestConnector):
    TEST_PREFIX = 'MySQL'

    trailing_spaces_ok = False
    can_rollback = False
    early_reconnect = False

    def can_connect(self):
        try: import MySQLdb
        except: return False
        try:
            conn = MySQLdb.connect(db=self.DB_NAME, user=self.DB_USER,
                                   passwd=self.DB_PASS)
            conn.close()
            return True
        except:
            return False


    def getPoolArgs(self):
        args = ('MySQLdb',)
        kw = {'db': self.DB_NAME, 'user': self.DB_USER, 'passwd': self.DB_PASS}
        return args, kw



class FirebirdConnector(DBTestConnector):
    TEST_PREFIX = 'Firebird'

    test_failures = False # failure testing causes problems
    escape_slashes = False
    good_sql = None # firebird doesn't handle failed sql well
    can_clear = False # firebird is not so good

    num_iterations = 5 # slow


    def can_connect(self):
        if requireModule('kinterbasdb') is None:
            return False
        try:
            self.startDB()
            self.stopDB()
            return True
        except:
            return False


    def startDB(self):
        import kinterbasdb
        self.DB_NAME = os.path.join(self.DB_DIR, DBTestConnector.DB_NAME)
        os.chmod(self.DB_DIR, stat.S_IRWXU + stat.S_IRWXG + stat.S_IRWXO)
        sql = 'create database "%s" user "%s" password "%s"'
        sql %= (self.DB_NAME, self.DB_USER, self.DB_PASS);
        conn = kinterbasdb.create_database(sql)
        conn.close()


    def getPoolArgs(self):
        args = ('kinterbasdb',)
        kw = {'database': self.DB_NAME, 'host': '127.0.0.1',
              'user': self.DB_USER, 'password': self.DB_PASS}
        return args, kw


    def stopDB(self):
        import kinterbasdb
        conn = kinterbasdb.connect(database=self.DB_NAME,
                                   host='127.0.0.1', user=self.DB_USER,
                                   password=self.DB_PASS)
        conn.drop_database()



def makeSQLTests(base, suffix, globals):
    """
    Make a test case for every db connector which can connect.

    @param base: Base class for test case. Additional base classes
                 will be a DBConnector subclass and unittest.TestCase
    @param suffix: A suffix used to create test case names. Prefixes
                   are defined in the DBConnector subclasses.
    """
    connectors = [PySQLite2Connector, SQLite3Connector, PyPgSQLConnector,
                  PsycopgConnector, MySQLConnector, FirebirdConnector]
    tests = {}
    for connclass in connectors:
        name = connclass.TEST_PREFIX + suffix

        class testcase(connclass, base, unittest.TestCase):
            __module__ = connclass.__module__

        testcase.__name__ = name
        if hasattr(connclass, "__qualname__"):
            testcase.__qualname__ = ".".join(
                connclass.__qualname__.split()[0:-1] + [name])
        tests[name] = testcase

    globals.update(tests)



# PySQLite2Connector SQLite3ADBAPITests PyPgSQLADBAPITests
# PsycopgADBAPITests MySQLADBAPITests FirebirdADBAPITests
makeSQLTests(ADBAPITestBase, 'ADBAPITests', globals())

# PySQLite2Connector SQLite3ReconnectTests PyPgSQLReconnectTests
# PsycopgReconnectTests MySQLReconnectTests FirebirdReconnectTests
makeSQLTests(ReconnectTestBase, 'ReconnectTests', globals())



class FakePool(object):
    """
    A fake L{ConnectionPool} for tests.

    @ivar connectionFactory: factory for making connections returned by the
        C{connect} method.
    @type connectionFactory: any callable
    """
    reconnect = True
    noisy = True

    def __init__(self, connectionFactory):
        self.connectionFactory = connectionFactory


    def connect(self):
        """
        Return an instance of C{self.connectionFactory}.
        """
        return self.connectionFactory()


    def disconnect(self, connection):
        """
        Do nothing.
        """



class ConnectionTests(unittest.TestCase):
    """
    Tests for the L{Connection} class.
    """

    def test_rollbackErrorLogged(self):
        """
        If an error happens during rollback, L{ConnectionLost} is raised but
        the original error is logged.
        """
        class ConnectionRollbackRaise(object):
            def rollback(self):
                raise RuntimeError("problem!")

        pool = FakePool(ConnectionRollbackRaise)
        connection = Connection(pool)
        self.assertRaises(ConnectionLost, connection.rollback)
        errors = self.flushLoggedErrors(RuntimeError)
        self.assertEqual(len(errors), 1)
        self.assertEqual(errors[0].value.args[0], "problem!")



class TransactionTests(unittest.TestCase):
    """
    Tests for the L{Transaction} class.
    """

    def test_reopenLogErrorIfReconnect(self):
        """
        If the cursor creation raises an error in L{Transaction.reopen}, it
        reconnects but log the error occurred.
        """
        class ConnectionCursorRaise(object):
            count = 0

            def reconnect(self):
                pass

            def cursor(self):
                if self.count == 0:
                    self.count += 1
                    raise RuntimeError("problem!")

        pool = FakePool(None)
        transaction = Transaction(pool, ConnectionCursorRaise())
        transaction.reopen()
        errors = self.flushLoggedErrors(RuntimeError)
        self.assertEqual(len(errors), 1)
        self.assertEqual(errors[0].value.args[0], "problem!")



class NonThreadPool(object):
    def callInThreadWithCallback(self, onResult, f, *a, **kw):
        success = True
        try:
            result = f(*a, **kw)
        except Exception:
            success = False
            result = Failure()
        onResult(success, result)



class DummyConnectionPool(ConnectionPool):
    """
    A testable L{ConnectionPool};
    """
    threadpool = NonThreadPool()

    def __init__(self):
        """
        Don't forward init call.
        """
        self._reactor = reactor



class EventReactor(object):
    """
    Partial L{IReactorCore} implementation with simple event-related
    methods.

    @ivar _running: A C{bool} indicating whether the reactor is pretending
        to have been started already or not.

    @ivar triggers: A C{list} of pending system event triggers.
    """
    def __init__(self, running):
        self._running = running
        self.triggers = []


    def callWhenRunning(self, function):
        if self._running:
            function()
        else:
            return self.addSystemEventTrigger('after', 'startup', function)


    def addSystemEventTrigger(self, phase, event, trigger):
        handle = (phase, event, trigger)
        self.triggers.append(handle)
        return handle


    def removeSystemEventTrigger(self, handle):
        self.triggers.remove(handle)



class ConnectionPoolTests(unittest.TestCase):
    """
    Unit tests for L{ConnectionPool}.
    """

    def test_runWithConnectionRaiseOriginalError(self):
        """
        If rollback fails, L{ConnectionPool.runWithConnection} raises the
        original exception and log the error of the rollback.
        """
        class ConnectionRollbackRaise(object):
            def __init__(self, pool):
                pass

            def rollback(self):
                raise RuntimeError("problem!")

        def raisingFunction(connection):
            raise ValueError("foo")

        pool = DummyConnectionPool()
        pool.connectionFactory = ConnectionRollbackRaise
        d = pool.runWithConnection(raisingFunction)
        d = self.assertFailure(d, ValueError)
        def cbFailed(ignored):
            errors = self.flushLoggedErrors(RuntimeError)
            self.assertEqual(len(errors), 1)
            self.assertEqual(errors[0].value.args[0], "problem!")
        d.addCallback(cbFailed)
        return d


    def test_closeLogError(self):
        """
        L{ConnectionPool._close} logs exceptions.
        """
        class ConnectionCloseRaise(object):
            def close(self):
                raise RuntimeError("problem!")

        pool = DummyConnectionPool()
        pool._close(ConnectionCloseRaise())

        errors = self.flushLoggedErrors(RuntimeError)
        self.assertEqual(len(errors), 1)
        self.assertEqual(errors[0].value.args[0], "problem!")


    def test_runWithInteractionRaiseOriginalError(self):
        """
        If rollback fails, L{ConnectionPool.runInteraction} raises the
        original exception and log the error of the rollback.
        """
        class ConnectionRollbackRaise(object):
            def __init__(self, pool):
                pass

            def rollback(self):
                raise RuntimeError("problem!")

        class DummyTransaction(object):
            def __init__(self, pool, connection):
                pass

        def raisingFunction(transaction):
            raise ValueError("foo")

        pool = DummyConnectionPool()
        pool.connectionFactory = ConnectionRollbackRaise
        pool.transactionFactory = DummyTransaction

        d = pool.runInteraction(raisingFunction)
        d = self.assertFailure(d, ValueError)
        def cbFailed(ignored):
            errors = self.flushLoggedErrors(RuntimeError)
            self.assertEqual(len(errors), 1)
            self.assertEqual(errors[0].value.args[0], "problem!")
        d.addCallback(cbFailed)
        return d


    def test_unstartedClose(self):
        """
        If L{ConnectionPool.close} is called without L{ConnectionPool.start}
        having been called, the pool's startup event is cancelled.
        """
        reactor = EventReactor(False)
        pool = ConnectionPool('twisted.test.test_adbapi', cp_reactor=reactor)
        # There should be a startup trigger waiting.
        self.assertEqual(reactor.triggers, [('after', 'startup', pool._start)])
        pool.close()
        # But not anymore.
        self.assertFalse(reactor.triggers)


    def test_startedClose(self):
        """
        If L{ConnectionPool.close} is called after it has been started, but
        not by its shutdown trigger, the shutdown trigger is cancelled.
        """
        reactor = EventReactor(True)
        pool = ConnectionPool('twisted.test.test_adbapi', cp_reactor=reactor)
        # There should be a shutdown trigger waiting.
        self.assertEqual(reactor.triggers,
                         [('during', 'shutdown', pool.finalClose)])
        pool.close()
        # But not anymore.
        self.assertFalse(reactor.triggers)

Youez - 2016 - github.com/yon3zu
LinuXploit