1 /// PostgreSQL database client implementation.
2 module vibe.db.postgresql;
3 
4 public import dpq2: ValueFormat;
5 public import dpq2.exception: Dpq2Exception;
6 public import dpq2.result;
7 public import dpq2.connection: ConnectionException, connStringCheck, ConnectionStart, CancellationException;
8 public import dpq2.args;
9 public import derelict.pq.pq;
10 
11 import vibe.core.connectionpool: ConnectionPool, VibeLockedConnection = LockedConnection;
12 import vibe.core.log;
13 import core.time: Duration, dur;
14 import std.exception: enforce;
15 import std.conv: to;
16 
17 ///
18 struct ClientSettings
19 {
20     string connString; /// PostgreSQL connection string
21     void delegate(Connection) afterStartConnectOrReset; /// called after connection established
22 }
23 
24 /// A Postgres client with connection pooling.
25 class PostgresClient
26 {
27     private ConnectionPool!Connection pool;
28 
29     /// afterStartConnectOrReset is called after connection established
30     this(
31         string connString,
32         uint connNum,
33         void delegate(Connection) afterStartConnectOrReset = null
34     )
35     {
36         immutable cs = ClientSettings(
37             connString,
38             afterStartConnectOrReset
39         );
40 
41         this(&createConnection, cs, connNum);
42     }
43 
44     /// Useful for external Connection implementation
45     this
46     (
47         Connection delegate(in ClientSettings) @safe connFactory,
48         immutable ClientSettings cs,
49         uint connNum,
50     )
51     {
52         cs.connString.connStringCheck;
53 
54         pool = new ConnectionPool!Connection(() @safe { return connFactory(cs); }, connNum);
55     }
56 
57     /// Useful for external Connection implementation
58     ///
59     /// Not cares about checking of connection string
60     this(Connection delegate() const pure @safe connFactory, uint connNum)
61     {
62         enforce(PQisthreadsafe() == 1);
63 
64         pool = new ConnectionPool!Connection(
65                 () @safe { return connFactory(); },
66                 connNum
67             );
68     }
69 
70     /// Get connection from the pool.
71     ///
72     /// Do not forgot to call .reset() for connection if ConnectionException
73     /// was caught while using LockedConnection!
74     LockedConnection lockConnection()
75     {
76         logDebugV("get connection from the pool");
77 
78         return pool.lockConnection();
79     }
80 
81     /// Use connection from the pool.
82     ///
83     /// Same as lockConnection but automatically maintains initiation of
84     /// reestablishing of connection by calling .reset()
85     ///
86     /// Returns: Value returned by delegate or void
87     T pickConnection(T)(scope T delegate(scope LockedConnection conn) dg)
88     {
89         logDebugV("get connection from the pool");
90         scope conn = pool.lockConnection();
91 
92         try
93             return dg(conn);
94         catch(ConnectionException e)
95         {
96             conn.reset(); // also may throw ConnectionException and this is normal behaviour
97 
98             throw e;
99         }
100     }
101 
102     ///
103     private Connection createConnection(in ClientSettings cs) @safe
104     {
105         return new Connection(cs);
106     }
107 }
108 
109 alias Connection = Dpq2Connection;
110 
111 ///
112 alias LockedConnection = VibeLockedConnection!Connection;
113 
114 /**
115  * dpq2.Connection adopted for using with Vibe.d
116  */
117 class Dpq2Connection : dpq2.Connection
118 {
119     Duration socketTimeout = dur!"seconds"(10); ///
120     Duration statementTimeout = dur!"seconds"(30); ///
121 
122     private const ClientSettings settings;
123 
124     ///
125     this(const ref ClientSettings settings) @trusted
126     {
127         this.settings = settings;
128 
129         super(settings.connString);
130         setClientEncoding("UTF8"); // TODO: do only if it is different from UTF8
131 
132         import std.conv: to;
133         logDebugV("creating new connection, delegate isNull="~(settings.afterStartConnectOrReset is null).to!string);
134 
135         if(settings.afterStartConnectOrReset !is null)
136             settings.afterStartConnectOrReset(this);
137     }
138 
139     /// Blocks while connection will be established or exception thrown
140     void reset()
141     {
142         super.resetStart;
143 
144         while(true)
145         {
146             if(status() == CONNECTION_BAD)
147                 throw new ConnectionException(this);
148 
149             if(resetPoll() != PGRES_POLLING_OK)
150             {
151                 socketEvent().wait(socketTimeout);
152                 continue;
153             }
154 
155             break;
156         }
157 
158         if(settings.afterStartConnectOrReset !is null)
159             settings.afterStartConnectOrReset(this);
160     }
161 
162     private auto socketEvent()
163     {
164         import vibe.core.core;
165 
166         version(Posix)
167         {
168             import core.sys.posix.fcntl;
169             assert((fcntl(this.posixSocket, F_GETFL, 0) & O_NONBLOCK), "Socket assumed to be non-blocking already");
170         }
171 
172         // vibe-core right now supports only read trigger event
173         // it also closes the socket on scope exit, thus a socket duplication here
174         return createFileDescriptorEvent(this.posixSocketDuplicate, FileDescriptorEvent.Trigger.read);
175     }
176 
177     private void waitEndOfReadAndConsume(in Duration timeout)
178     {
179         auto event = socketEvent();
180 
181         do
182         {
183             if(!event.wait(timeout))
184                 throw new PostgresClientTimeoutException(__FILE__, __LINE__);
185 
186             consumeInput();
187         }
188         while (this.isBusy); // wait until PQgetresult won't block anymore
189     }
190 
191     private void doQuery(void delegate() doesQueryAndCollectsResults)
192     {
193         // Try to get usable connection and send SQL command
194         while(true)
195         {
196             if(status() == CONNECTION_BAD)
197                 throw new ConnectionException(this, __FILE__, __LINE__);
198 
199             if(poll() != PGRES_POLLING_OK)
200             {
201                 waitEndOfReadAndConsume(socketTimeout);
202                 continue;
203             }
204             else
205             {
206                 break;
207             }
208         }
209 
210         logDebugV("doesQuery() call");
211         doesQueryAndCollectsResults();
212     }
213 
214     private immutable(Result) runStatementBlockingManner(void delegate() sendsStatementDg)
215     {
216         immutable(Result)[] res;
217 
218         runStatementBlockingMannerWithMultipleResults(sendsStatementDg, (r){ res ~= r; }, false);
219 
220         enforce(res.length == 1, "Simple query without row-by-row mode can return only one Result instance, not "~res.length.to!string);
221 
222         return res[0];
223     }
224 
225     private void runStatementBlockingMannerWithMultipleResults(void delegate() sendsStatementDg, void delegate(immutable(Result)) processResult, bool isRowByRowMode)
226     {
227         logDebugV(__FUNCTION__);
228         immutable(Result)[] res;
229 
230         doQuery(()
231             {
232                 sendsStatementDg();
233 
234                 if(isRowByRowMode)
235                 {
236                     enforce(setSingleRowMode, "Failed to set row-by-row mode");
237                 }
238 
239                 scope(failure)
240                 {
241                     if(isRowByRowMode)
242                         while(getResult() !is null){} // autoclean of results queue
243                 }
244 
245                 scope(exit)
246                 {
247                     logDebugV("consumeInput()");
248                     consumeInput(); // TODO: redundant call (also called in waitEndOfRead) - can be moved into catch block?
249 
250                     while(true)
251                     {
252                         auto r = getResult();
253 
254                         /*
255                          I am trying to check connection status with PostgreSQL server
256                          with PQstatus and it always always return CONNECTION_OK even
257                          when the cable to the server is unplugged.
258                                                     – user1972556 (stackoverflow.com)
259 
260                          ...the idea of testing connections is fairly silly, since the
261                          connection might die between when you test it and when you run
262                          your "real" query. Don't test connections, just use them, and
263                          if they fail be prepared to retry everything since you opened
264                          the transaction. – Craig Ringer Jan 14 '13 at 2:59
265                          */
266                         if(status == CONNECTION_BAD)
267                             throw new ConnectionException(this, __FILE__, __LINE__);
268 
269                         if(r is null) break;
270 
271                         processResult(r);
272                     }
273                 }
274 
275                 try
276                 {
277                     waitEndOfReadAndConsume(statementTimeout);
278                 }
279                 catch(PostgresClientTimeoutException e)
280                 {
281                     logDebugV("Exceeded Posgres query time limit");
282 
283                     try
284                         cancel(); // cancel sql query
285                     catch(CancellationException ce) // means successful cancellation
286                         e.msg ~= ", "~ce.msg;
287 
288                     throw e;
289                 }
290             }
291         );
292     }
293 
294     ///
295     immutable(Answer) execStatement(
296         string sqlCommand,
297         ValueFormat resultFormat = ValueFormat.BINARY
298     )
299     {
300         QueryParams p;
301         p.resultFormat = resultFormat;
302         p.sqlCommand = sqlCommand;
303 
304         return execStatement(p);
305     }
306 
307     ///
308     immutable(Answer) execStatement(in ref QueryParams params)
309     {
310         auto res = runStatementBlockingManner({ sendQueryParams(params); });
311 
312         return res.getAnswer;
313     }
314 
315     /// Row-by-row version of execStatement
316     ///
317     /// Delegate called for each received row.
318     ///
319     /// More info: https://www.postgresql.org/docs/current/libpq-single-row-mode.html
320     ///
321     void execStatementRbR(
322         string sqlCommand,
323         void delegate(immutable(Row)) answerRowProcessDg,
324         ValueFormat resultFormat = ValueFormat.BINARY
325     )
326     {
327         QueryParams p;
328         p.resultFormat = resultFormat;
329         p.sqlCommand = sqlCommand;
330 
331         execStatementRbR(p, answerRowProcessDg);
332     }
333 
334     /// Ditto
335     void execStatementRbR(in ref QueryParams params, void delegate(immutable(Row)) answerRowProcessDg)
336     {
337         runStatementWithRowByRowResult(
338             { sendQueryParams(params); },
339             answerRowProcessDg
340         );
341     }
342 
343     private void runStatementWithRowByRowResult(void delegate() sendsStatementDg, void delegate(immutable(Row)) answerRowProcessDg)
344     {
345         runStatementBlockingMannerWithMultipleResults(
346                 sendsStatementDg,
347                 (r)
348                 {
349                     auto answer = r.getAnswer;
350 
351                     enforce(answer.length <= 1, `0 or 1 rows can be received, not `~answer.length.to!string);
352 
353                     if(answer.length == 1)
354                     {
355                         enforce(r.status == PGRES_SINGLE_TUPLE, `Wrong result status: `~r.status.to!string);
356 
357                         answerRowProcessDg(answer[0]);
358                     }
359                 },
360                 true
361             );
362     }
363 
364     ///
365     void prepareStatement(
366         string statementName,
367         string sqlStatement,
368         Oid[] oids = null
369     )
370     {
371         auto r = runStatementBlockingManner(
372                 {sendPrepare(statementName, sqlStatement, oids);}
373             );
374 
375         if(r.status != PGRES_COMMAND_OK)
376             throw new ResponseException(r, __FILE__, __LINE__);
377     }
378 
379     ///
380     immutable(Answer) execPreparedStatement(in ref QueryParams params)
381     {
382         auto res = runStatementBlockingManner({ sendQueryPrepared(params); });
383 
384         return res.getAnswer;
385     }
386 
387     ///
388     immutable(Answer) describePreparedStatement(string preparedStatementName)
389     {
390         auto res = runStatementBlockingManner({ sendDescribePrepared(preparedStatementName); });
391 
392         return res.getAnswer;
393     }
394 
395     /**
396      * Non blocking method to wait for next notification.
397      *
398      * Params:
399      *      timeout = maximal duration to wait for the new Notify to be received
400      *
401      * Returns: New Notify or null when no other notification is available or timeout occurs.
402      * Throws: ConnectionException on connection failure
403      */
404     Notify waitForNotify(in Duration timeout = Duration.max)
405     {
406         // try read available
407         auto ntf = getNextNotify();
408         if (ntf !is null) return ntf;
409 
410         // wait for next one
411         try waitEndOfReadAndConsume(timeout);
412         catch (PostgresClientTimeoutException) return null;
413         return getNextNotify();
414     }
415 }
416 
417 ///
418 class PostgresClientTimeoutException : Dpq2Exception
419 {
420     this(string file, size_t line)
421     {
422         super("Exceeded Posgres query time limit", file, line);
423     }
424 }
425 
426 unittest
427 {
428     bool raised = false;
429 
430     try
431     {
432         auto client = new PostgresClient("wrong connect string", 2);
433     }
434     catch(ConnectionException e)
435         raised = true;
436 
437     assert(raised);
438 }
439 
440 version(IntegrationTest) void __integration_test(string connString)
441 {
442     setLogLevel = LogLevel.debugV;
443 
444     auto client = new PostgresClient(connString, 3);
445 
446     auto conn = client.lockConnection;
447     {
448         auto res = conn.execStatement(
449             "SELECT 123::integer, 567::integer, 'asd fgh'::text",
450             ValueFormat.BINARY
451         );
452 
453         assert(res.getAnswer[0][1].as!PGinteger == 567);
454     }
455 
456     {
457         // Row-by-row result receiving
458         int[] res;
459 
460         conn.execStatementRbR(
461             `SELECT generate_series(0, 3) as i, pg_sleep(0.2)`,
462             (immutable(Row) r)
463             {
464                 res ~= r[0].as!int;
465             }
466         );
467 
468         assert(res.length == 4);
469     }
470 
471     {
472         // Row-by-row result receiving: error while receiving
473         size_t rowCounter;
474 
475         QueryParams p;
476         p.sqlCommand =
477             `SELECT 1.0 / (generate_series(1, 100000) % 80000)`; // division by zero error at generate_series=80000
478 
479         bool assertThrown;
480 
481         try
482             conn.execStatementRbR(p,
483                 (immutable(Row) r)
484                 {
485                     rowCounter++;
486                 }
487             );
488         catch(ResponseException) // catches ERROR:  division by zero
489             assertThrown = true;
490 
491         assert(assertThrown);
492         assert(rowCounter > 0);
493     }
494 
495     {
496         QueryParams p;
497         p.sqlCommand = `SELECT 123`;
498 
499         auto res = conn.execStatement(p);
500 
501         assert(res.length == 1);
502         assert(res[0][0].as!int == 123);
503     }
504 
505     {
506         conn.prepareStatement("stmnt_name", "SELECT 123::integer");
507 
508         bool throwFlag = false;
509 
510         try
511             conn.prepareStatement("wrong_stmnt", "WRONG SQL STATEMENT");
512         catch(ResponseException)
513             throwFlag = true;
514 
515         assert(throwFlag);
516     }
517 
518     {
519         import dpq2.oids: OidType;
520 
521         auto a = conn.describePreparedStatement("stmnt_name");
522 
523         assert(a.nParams == 0);
524         assert(a.OID(0) == OidType.Int4);
525     }
526 
527     {
528         QueryParams p;
529         p.preparedStatementName = "stmnt_name";
530 
531         auto r = conn.execPreparedStatement(p);
532 
533         assert(r.getAnswer[0][0].as!PGinteger == 123);
534     }
535 
536     {
537         // Fibers test
538         import vibe.core.concurrency;
539 
540         auto future0 = async({
541             client.pickConnection(
542                 (scope c)
543                 {
544                     immutable answer = c.execStatement("SELECT 'New connection 0'");
545                 }
546             );
547 
548             return 1;
549         });
550 
551         auto future1 = async({
552             client.pickConnection(
553                 (scope c)
554                 {
555                     immutable answer = c.execStatement("SELECT 'New connection 1'");
556                 }
557             );
558 
559             return 1;
560         });
561 
562         immutable answer = conn.execStatement("SELECT 'Old connection'");
563 
564         assert(future0 == 1);
565         assert(future1 == 1);
566         assert(answer.length == 1);
567     }
568 
569     {
570         assert(conn.escapeIdentifier("abc") == "\"abc\"");
571     }
572 
573     {
574         import core.time : msecs;
575         import vibe.core.core : sleep;
576         import vibe.core.concurrency : async;
577 
578         struct NTF {string name; string extra;}
579 
580         auto futureNtf = async({
581             Notify pgNtf;
582 
583             client.pickConnection(
584                 (scope c)
585                 {
586                     c.execStatement("LISTEN foo");
587                     pgNtf = c.waitForNotify();
588                 }
589             );
590 
591             assert(pgNtf !is null);
592             return NTF(pgNtf.name.idup, pgNtf.extra.idup);
593         });
594 
595         sleep(10.msecs);
596         conn.execStatement("NOTIFY foo, 'bar'");
597 
598         assert(futureNtf.name == "foo");
599         assert(futureNtf.extra == "bar");
600     }
601 }