1 /// PostgreSQL database client implementation. 2 module vibe.db.postgresql; 3 4 public import dpq2: ValueFormat; 5 public import dpq2.exception: Dpq2Exception; 6 public import dpq2.result; 7 public import dpq2.connection: ConnectionException, connStringCheck, ConnectionStart, CancellationException; 8 public import dpq2.args; 9 public import derelict.pq.pq; 10 11 import vibe.core.connectionpool: ConnectionPool, VibeLockedConnection = LockedConnection; 12 import vibe.core.log; 13 import core.time: Duration, dur; 14 import std.exception: enforce; 15 import std.conv: to; 16 17 /// 18 struct ClientSettings 19 { 20 string connString; /// PostgreSQL connection string 21 void delegate(Connection) afterStartConnectOrReset; /// called after connection established 22 } 23 24 /// A Postgres client with connection pooling. 25 class PostgresClient 26 { 27 private ConnectionPool!Connection pool; 28 29 /// afterStartConnectOrReset is called after connection established 30 this( 31 string connString, 32 uint connNum, 33 void delegate(Connection) afterStartConnectOrReset = null 34 ) 35 { 36 immutable cs = ClientSettings( 37 connString, 38 afterStartConnectOrReset 39 ); 40 41 this(&createConnection, cs, connNum); 42 } 43 44 /// Useful for external Connection implementation 45 this 46 ( 47 Connection delegate(in ClientSettings) @safe connFactory, 48 immutable ClientSettings cs, 49 uint connNum, 50 ) 51 { 52 cs.connString.connStringCheck; 53 54 pool = new ConnectionPool!Connection(() @safe { return connFactory(cs); }, connNum); 55 } 56 57 /// Useful for external Connection implementation 58 /// 59 /// Not cares about checking of connection string 60 this(Connection delegate() const pure @safe connFactory, uint connNum) 61 { 62 enforce(PQisthreadsafe() == 1); 63 64 pool = new ConnectionPool!Connection( 65 () @safe { return connFactory(); }, 66 connNum 67 ); 68 } 69 70 /// Get connection from the pool. 71 /// 72 /// Do not forgot to call .reset() for connection if ConnectionException 73 /// was caught while using LockedConnection! 74 LockedConnection lockConnection() 75 { 76 logDebugV("get connection from the pool"); 77 78 return pool.lockConnection(); 79 } 80 81 /// Use connection from the pool. 82 /// 83 /// Same as lockConnection but automatically maintains initiation of 84 /// reestablishing of connection by calling .reset() 85 /// 86 /// Returns: Value returned by delegate or void 87 T pickConnection(T)(scope T delegate(scope LockedConnection conn) dg) 88 { 89 logDebugV("get connection from the pool"); 90 scope conn = pool.lockConnection(); 91 92 try 93 return dg(conn); 94 catch(ConnectionException e) 95 { 96 conn.reset(); // also may throw ConnectionException and this is normal behaviour 97 98 throw e; 99 } 100 } 101 102 /// 103 private Connection createConnection(in ClientSettings cs) @safe 104 { 105 return new Connection(cs); 106 } 107 } 108 109 alias Connection = Dpq2Connection; 110 111 /// 112 alias LockedConnection = VibeLockedConnection!Connection; 113 114 /** 115 * dpq2.Connection adopted for using with Vibe.d 116 */ 117 class Dpq2Connection : dpq2.Connection 118 { 119 Duration socketTimeout = dur!"seconds"(10); /// 120 Duration statementTimeout = dur!"seconds"(30); /// 121 122 private const ClientSettings settings; 123 124 /// 125 this(const ref ClientSettings settings) @trusted 126 { 127 this.settings = settings; 128 129 super(settings.connString); 130 setClientEncoding("UTF8"); // TODO: do only if it is different from UTF8 131 132 import std.conv: to; 133 logDebugV("creating new connection, delegate isNull="~(settings.afterStartConnectOrReset is null).to!string); 134 135 if(settings.afterStartConnectOrReset !is null) 136 settings.afterStartConnectOrReset(this); 137 } 138 139 /// Blocks while connection will be established or exception thrown 140 void reset() 141 { 142 super.resetStart; 143 144 while(true) 145 { 146 if(status() == CONNECTION_BAD) 147 throw new ConnectionException(this); 148 149 if(resetPoll() != PGRES_POLLING_OK) 150 { 151 socketEvent().wait(socketTimeout); 152 continue; 153 } 154 155 break; 156 } 157 158 if(settings.afterStartConnectOrReset !is null) 159 settings.afterStartConnectOrReset(this); 160 } 161 162 private auto socketEvent() 163 { 164 import vibe.core.core; 165 166 version(Posix) 167 { 168 import core.sys.posix.fcntl; 169 assert((fcntl(this.posixSocket, F_GETFL, 0) & O_NONBLOCK), "Socket assumed to be non-blocking already"); 170 } 171 172 // vibe-core right now supports only read trigger event 173 // it also closes the socket on scope exit, thus a socket duplication here 174 return createFileDescriptorEvent(this.posixSocketDuplicate, FileDescriptorEvent.Trigger.read); 175 } 176 177 private void waitEndOfReadAndConsume(in Duration timeout) 178 { 179 auto event = socketEvent(); 180 181 do 182 { 183 if(!event.wait(timeout)) 184 throw new PostgresClientTimeoutException(__FILE__, __LINE__); 185 186 consumeInput(); 187 } 188 while (this.isBusy); // wait until PQgetresult won't block anymore 189 } 190 191 private void doQuery(void delegate() doesQueryAndCollectsResults) 192 { 193 // Try to get usable connection and send SQL command 194 while(true) 195 { 196 if(status() == CONNECTION_BAD) 197 throw new ConnectionException(this, __FILE__, __LINE__); 198 199 if(poll() != PGRES_POLLING_OK) 200 { 201 waitEndOfReadAndConsume(socketTimeout); 202 continue; 203 } 204 else 205 { 206 break; 207 } 208 } 209 210 logDebugV("doesQuery() call"); 211 doesQueryAndCollectsResults(); 212 } 213 214 private immutable(Result) runStatementBlockingManner(void delegate() sendsStatementDg) 215 { 216 immutable(Result)[] res; 217 218 runStatementBlockingMannerWithMultipleResults(sendsStatementDg, (r){ res ~= r; }, false); 219 220 enforce(res.length == 1, "Simple query without row-by-row mode can return only one Result instance, not "~res.length.to!string); 221 222 return res[0]; 223 } 224 225 private void runStatementBlockingMannerWithMultipleResults(void delegate() sendsStatementDg, void delegate(immutable(Result)) processResult, bool isRowByRowMode) 226 { 227 logDebugV(__FUNCTION__); 228 immutable(Result)[] res; 229 230 doQuery(() 231 { 232 sendsStatementDg(); 233 234 if(isRowByRowMode) 235 { 236 enforce(setSingleRowMode, "Failed to set row-by-row mode"); 237 } 238 239 scope(failure) 240 { 241 if(isRowByRowMode) 242 while(getResult() !is null){} // autoclean of results queue 243 } 244 245 scope(exit) 246 { 247 logDebugV("consumeInput()"); 248 consumeInput(); // TODO: redundant call (also called in waitEndOfRead) - can be moved into catch block? 249 250 while(true) 251 { 252 auto r = getResult(); 253 254 /* 255 I am trying to check connection status with PostgreSQL server 256 with PQstatus and it always always return CONNECTION_OK even 257 when the cable to the server is unplugged. 258 – user1972556 (stackoverflow.com) 259 260 ...the idea of testing connections is fairly silly, since the 261 connection might die between when you test it and when you run 262 your "real" query. Don't test connections, just use them, and 263 if they fail be prepared to retry everything since you opened 264 the transaction. – Craig Ringer Jan 14 '13 at 2:59 265 */ 266 if(status == CONNECTION_BAD) 267 throw new ConnectionException(this, __FILE__, __LINE__); 268 269 if(r is null) break; 270 271 processResult(r); 272 } 273 } 274 275 try 276 { 277 waitEndOfReadAndConsume(statementTimeout); 278 } 279 catch(PostgresClientTimeoutException e) 280 { 281 logDebugV("Exceeded Posgres query time limit"); 282 283 try 284 cancel(); // cancel sql query 285 catch(CancellationException ce) // means successful cancellation 286 e.msg ~= ", "~ce.msg; 287 288 throw e; 289 } 290 } 291 ); 292 } 293 294 /// 295 immutable(Answer) execStatement( 296 string sqlCommand, 297 ValueFormat resultFormat = ValueFormat.BINARY 298 ) 299 { 300 QueryParams p; 301 p.resultFormat = resultFormat; 302 p.sqlCommand = sqlCommand; 303 304 return execStatement(p); 305 } 306 307 /// 308 immutable(Answer) execStatement(in ref QueryParams params) 309 { 310 auto res = runStatementBlockingManner({ sendQueryParams(params); }); 311 312 return res.getAnswer; 313 } 314 315 /// Row-by-row version of execStatement 316 /// 317 /// Delegate called for each received row. 318 /// 319 /// More info: https://www.postgresql.org/docs/current/libpq-single-row-mode.html 320 /// 321 void execStatementRbR( 322 string sqlCommand, 323 void delegate(immutable(Row)) answerRowProcessDg, 324 ValueFormat resultFormat = ValueFormat.BINARY 325 ) 326 { 327 QueryParams p; 328 p.resultFormat = resultFormat; 329 p.sqlCommand = sqlCommand; 330 331 execStatementRbR(p, answerRowProcessDg); 332 } 333 334 /// Ditto 335 void execStatementRbR(in ref QueryParams params, void delegate(immutable(Row)) answerRowProcessDg) 336 { 337 runStatementWithRowByRowResult( 338 { sendQueryParams(params); }, 339 answerRowProcessDg 340 ); 341 } 342 343 private void runStatementWithRowByRowResult(void delegate() sendsStatementDg, void delegate(immutable(Row)) answerRowProcessDg) 344 { 345 runStatementBlockingMannerWithMultipleResults( 346 sendsStatementDg, 347 (r) 348 { 349 auto answer = r.getAnswer; 350 351 enforce(answer.length <= 1, `0 or 1 rows can be received, not `~answer.length.to!string); 352 353 if(answer.length == 1) 354 { 355 enforce(r.status == PGRES_SINGLE_TUPLE, `Wrong result status: `~r.status.to!string); 356 357 answerRowProcessDg(answer[0]); 358 } 359 }, 360 true 361 ); 362 } 363 364 /// 365 void prepareStatement( 366 string statementName, 367 string sqlStatement, 368 Oid[] oids = null 369 ) 370 { 371 auto r = runStatementBlockingManner( 372 {sendPrepare(statementName, sqlStatement, oids);} 373 ); 374 375 if(r.status != PGRES_COMMAND_OK) 376 throw new ResponseException(r, __FILE__, __LINE__); 377 } 378 379 /// 380 immutable(Answer) execPreparedStatement(in ref QueryParams params) 381 { 382 auto res = runStatementBlockingManner({ sendQueryPrepared(params); }); 383 384 return res.getAnswer; 385 } 386 387 /// 388 immutable(Answer) describePreparedStatement(string preparedStatementName) 389 { 390 auto res = runStatementBlockingManner({ sendDescribePrepared(preparedStatementName); }); 391 392 return res.getAnswer; 393 } 394 395 /** 396 * Non blocking method to wait for next notification. 397 * 398 * Params: 399 * timeout = maximal duration to wait for the new Notify to be received 400 * 401 * Returns: New Notify or null when no other notification is available or timeout occurs. 402 * Throws: ConnectionException on connection failure 403 */ 404 Notify waitForNotify(in Duration timeout = Duration.max) 405 { 406 // try read available 407 auto ntf = getNextNotify(); 408 if (ntf !is null) return ntf; 409 410 // wait for next one 411 try waitEndOfReadAndConsume(timeout); 412 catch (PostgresClientTimeoutException) return null; 413 return getNextNotify(); 414 } 415 } 416 417 /// 418 class PostgresClientTimeoutException : Dpq2Exception 419 { 420 this(string file, size_t line) 421 { 422 super("Exceeded Posgres query time limit", file, line); 423 } 424 } 425 426 unittest 427 { 428 bool raised = false; 429 430 try 431 { 432 auto client = new PostgresClient("wrong connect string", 2); 433 } 434 catch(ConnectionException e) 435 raised = true; 436 437 assert(raised); 438 } 439 440 version(IntegrationTest) void __integration_test(string connString) 441 { 442 setLogLevel = LogLevel.debugV; 443 444 auto client = new PostgresClient(connString, 3); 445 446 auto conn = client.lockConnection; 447 { 448 auto res = conn.execStatement( 449 "SELECT 123::integer, 567::integer, 'asd fgh'::text", 450 ValueFormat.BINARY 451 ); 452 453 assert(res.getAnswer[0][1].as!PGinteger == 567); 454 } 455 456 { 457 // Row-by-row result receiving 458 int[] res; 459 460 conn.execStatementRbR( 461 `SELECT generate_series(0, 3) as i, pg_sleep(0.2)`, 462 (immutable(Row) r) 463 { 464 res ~= r[0].as!int; 465 } 466 ); 467 468 assert(res.length == 4); 469 } 470 471 { 472 // Row-by-row result receiving: error while receiving 473 size_t rowCounter; 474 475 QueryParams p; 476 p.sqlCommand = 477 `SELECT 1.0 / (generate_series(1, 100000) % 80000)`; // division by zero error at generate_series=80000 478 479 bool assertThrown; 480 481 try 482 conn.execStatementRbR(p, 483 (immutable(Row) r) 484 { 485 rowCounter++; 486 } 487 ); 488 catch(ResponseException) // catches ERROR: division by zero 489 assertThrown = true; 490 491 assert(assertThrown); 492 assert(rowCounter > 0); 493 } 494 495 { 496 QueryParams p; 497 p.sqlCommand = `SELECT 123`; 498 499 auto res = conn.execStatement(p); 500 501 assert(res.length == 1); 502 assert(res[0][0].as!int == 123); 503 } 504 505 { 506 conn.prepareStatement("stmnt_name", "SELECT 123::integer"); 507 508 bool throwFlag = false; 509 510 try 511 conn.prepareStatement("wrong_stmnt", "WRONG SQL STATEMENT"); 512 catch(ResponseException) 513 throwFlag = true; 514 515 assert(throwFlag); 516 } 517 518 { 519 import dpq2.oids: OidType; 520 521 auto a = conn.describePreparedStatement("stmnt_name"); 522 523 assert(a.nParams == 0); 524 assert(a.OID(0) == OidType.Int4); 525 } 526 527 { 528 QueryParams p; 529 p.preparedStatementName = "stmnt_name"; 530 531 auto r = conn.execPreparedStatement(p); 532 533 assert(r.getAnswer[0][0].as!PGinteger == 123); 534 } 535 536 { 537 // Fibers test 538 import vibe.core.concurrency; 539 540 auto future0 = async({ 541 client.pickConnection( 542 (scope c) 543 { 544 immutable answer = c.execStatement("SELECT 'New connection 0'"); 545 } 546 ); 547 548 return 1; 549 }); 550 551 auto future1 = async({ 552 client.pickConnection( 553 (scope c) 554 { 555 immutable answer = c.execStatement("SELECT 'New connection 1'"); 556 } 557 ); 558 559 return 1; 560 }); 561 562 immutable answer = conn.execStatement("SELECT 'Old connection'"); 563 564 assert(future0 == 1); 565 assert(future1 == 1); 566 assert(answer.length == 1); 567 } 568 569 { 570 assert(conn.escapeIdentifier("abc") == "\"abc\""); 571 } 572 573 { 574 import core.time : msecs; 575 import vibe.core.core : sleep; 576 import vibe.core.concurrency : async; 577 578 struct NTF {string name; string extra;} 579 580 auto futureNtf = async({ 581 Notify pgNtf; 582 583 client.pickConnection( 584 (scope c) 585 { 586 c.execStatement("LISTEN foo"); 587 pgNtf = c.waitForNotify(); 588 } 589 ); 590 591 assert(pgNtf !is null); 592 return NTF(pgNtf.name.idup, pgNtf.extra.idup); 593 }); 594 595 sleep(10.msecs); 596 conn.execStatement("NOTIFY foo, 'bar'"); 597 598 assert(futureNtf.name == "foo"); 599 assert(futureNtf.extra == "bar"); 600 } 601 }