1 /// PostgreSQL database client implementation.
2 module vibe.db.postgresql;
3 
4 public import dpq2: ValueFormat;
5 public import dpq2.exception: Dpq2Exception;
6 public import dpq2.result;
7 public import dpq2.connection: ConnectionException, connStringCheck, ConnectionStart, CancellationException;
8 public import dpq2.args;
9 public import derelict.pq.pq;
10 
11 import vibe.core.connectionpool: ConnectionPool, VibeLockedConnection = LockedConnection;
12 import vibe.core.log;
13 import core.time: Duration, dur;
14 import std.exception: enforce;
15 import std.conv: to;
16 
17 ///
18 struct ClientSettings
19 {
20     string connString; ///
21     void delegate(Connection) afterStartConnectOrReset; ///
22 }
23 
24 /// A Postgres client with connection pooling.
25 class PostgresClient
26 {
27     private ConnectionPool!Connection pool;
28 
29     ///
30     this(
31         string connString,
32         uint connNum,
33         void delegate(Connection) afterStartConnectOrReset = null
34     )
35     {
36         immutable cs = ClientSettings(
37             connString,
38             afterStartConnectOrReset
39         );
40 
41         this(&createConnection, cs, connNum);
42     }
43 
44     ///
45     this
46     (
47         Connection delegate(in ClientSettings) @safe connFactory,
48         immutable ClientSettings cs,
49         uint connNum,
50     )
51     {
52         cs.connString.connStringCheck;
53 
54         pool = new ConnectionPool!Connection(() @safe { return connFactory(cs); }, connNum);
55     }
56 
57     ///
58     this(Connection delegate() const pure @safe connFactory, uint connNum)
59     {
60         enforce(PQisthreadsafe() == 1);
61 
62         pool = new ConnectionPool!Connection(
63                 () @safe { return connFactory(); },
64                 connNum
65             );
66     }
67 
68     /// Get connection from the pool.
69     LockedConnection lockConnection()
70     {
71         logDebugV("get connection from the pool");
72 
73         return pool.lockConnection();
74     }
75 
76     ///
77     Connection createConnection(in ClientSettings cs) @safe
78     {
79         return new Connection(cs);
80     }
81 }
82 
83 alias Connection = Dpq2Connection;
84 deprecated("use Connection instead") alias __Conn = Connection;
85 
86 ///
87 alias LockedConnection = VibeLockedConnection!Connection;
88 
89 /**
90  * dpq2.Connection adopted for using with Vibe.d
91  */
92 class Dpq2Connection : dpq2.Connection
93 {
94     Duration socketTimeout = dur!"seconds"(10); ///
95     Duration statementTimeout = dur!"seconds"(30); ///
96 
97     private const ClientSettings* settings;
98 
99     ///
100     this(const ref ClientSettings settings) @trusted
101     {
102         this.settings = &settings;
103 
104         super(settings.connString);
105         setClientEncoding("UTF8"); // TODO: do only if it is different from UTF8
106 
107         import std.conv: to;
108         logDebugV("creating new connection, delegate isNull="~(settings.afterStartConnectOrReset is null).to!string);
109 
110         if(settings.afterStartConnectOrReset !is null)
111             settings.afterStartConnectOrReset(this);
112     }
113 
114     ///
115     override void resetStart()
116     {
117         super.resetStart;
118 
119         if(settings.afterStartConnectOrReset !is null)
120             settings.afterStartConnectOrReset(this);
121     }
122 
123     private void waitEndOfRead(in Duration timeout) // TODO: rename to waitEndOf + add FileDescriptorEvent.Trigger argument
124     {
125         import vibe.core.core;
126 
127         version(Posix)
128         {
129             import core.sys.posix.fcntl;
130             assert((fcntl(this.posixSocket, F_GETFL, 0) & O_NONBLOCK), "Socket assumed to be non-blocking already");
131         }
132 
133         version(Have_vibe_core)
134         {
135             // vibe-core right now supports only read trigger event
136             // it also closes the socket on scope exit, thus a socket duplication here
137             auto event = createFileDescriptorEvent(this.posixSocketDuplicate, FileDescriptorEvent.Trigger.read);
138         }
139         else
140         {
141             auto event = createFileDescriptorEvent(this.posixSocket, FileDescriptorEvent.Trigger.any);
142             scope(exit) destroy(event); // Prevents 100% CPU usage
143         }
144 
145         do
146         {
147             if(!event.wait(timeout))
148                 throw new PostgresClientTimeoutException(__FILE__, __LINE__);
149 
150             consumeInput();
151         }
152         while (this.isBusy); // wait until PQgetresult won't block anymore
153     }
154 
155     private void doQuery(void delegate() doesQueryAndCollectsResults)
156     {
157         // Try to get usable connection and send SQL command
158         while(true)
159         {
160             if(status() == CONNECTION_BAD)
161                 throw new ConnectionException(this, __FILE__, __LINE__);
162 
163             if(poll() != PGRES_POLLING_OK)
164             {
165                 waitEndOfRead(socketTimeout);
166                 continue;
167             }
168             else
169             {
170                 break;
171             }
172         }
173 
174         logDebugV("doesQuery() call");
175         doesQueryAndCollectsResults();
176     }
177 
178     private immutable(Result) runStatementBlockingManner(void delegate() sendsStatement)
179     {
180         logDebugV("runStatementBlockingManner");
181         immutable(Result)[] res;
182 
183         doQuery(()
184             {
185                 sendsStatement();
186 
187                 try
188                 {
189                     waitEndOfRead(statementTimeout);
190                 }
191                 catch(PostgresClientTimeoutException e)
192                 {
193                     logDebugV("Exceeded Posgres query time limit");
194 
195                     try
196                         cancel(); // cancel sql query
197                     catch(CancellationException ce) // means successful cancellation
198                         e.msg ~= ", "~ce.msg;
199 
200                     throw e;
201                 }
202                 finally
203                 {
204                     logDebugV("consumeInput()");
205                     consumeInput();
206 
207                     while(true)
208                     {
209                         logDebugV("getResult()");
210                         auto r = getResult();
211                         if(r is null) break;
212                         res ~= r;
213                     }
214                 }
215             }
216         );
217 
218         /*
219          I am trying to check connection status with PostgreSQL server
220          with PQstatus and it always always return CONNECTION_OK even
221          when the cable to the server is unplugged.
222                                     – user1972556 (stackoverflow.com)
223 
224          ...the idea of testing connections is fairly silly, since the
225          connection might die between when you test it and when you run
226          your "real" query. Don't test connections, just use them, and
227          if they fail be prepared to retry everything since you opened
228          the transaction. – Craig Ringer Jan 14 '13 at 2:59
229          */
230         if(status == CONNECTION_BAD)
231             throw new ConnectionException(this, __FILE__, __LINE__);
232 
233         enforce(res.length == 1, "Simple query can return only one Result instance, not "~res.length.to!string);
234 
235         return res[0];
236     }
237 
238     ///
239     immutable(Answer) execStatement(
240         string sqlCommand,
241         ValueFormat resultFormat = ValueFormat.BINARY
242     )
243     {
244         QueryParams p;
245         p.resultFormat = resultFormat;
246         p.sqlCommand = sqlCommand;
247 
248         return execStatement(p);
249     }
250 
251     ///
252     immutable(Answer) execStatement(in ref QueryParams params)
253     {
254         auto res = runStatementBlockingManner({ sendQueryParams(params); });
255 
256         return res.getAnswer;
257     }
258 
259     ///
260     void prepareStatement(
261         string statementName,
262         string sqlStatement,
263         Oid[] oids = null
264     )
265     {
266         auto r = runStatementBlockingManner(
267                 {sendPrepare(statementName, sqlStatement, oids);}
268             );
269 
270         if(r.status != PGRES_COMMAND_OK)
271             throw new ResponseException(r, __FILE__, __LINE__);
272     }
273 
274     ///
275     immutable(Answer) execPreparedStatement(in ref QueryParams params)
276     {
277         auto res = runStatementBlockingManner({ sendQueryPrepared(params); });
278 
279         return res.getAnswer;
280     }
281 
282     ///
283     immutable(Answer) describePreparedStatement(string preparedStatementName)
284     {
285         auto res = runStatementBlockingManner({ sendDescribePrepared(preparedStatementName); });
286 
287         return res.getAnswer;
288     }
289 }
290 
291 ///
292 class PostgresClientTimeoutException : Dpq2Exception
293 {
294     this(string file, size_t line)
295     {
296         super("Exceeded Posgres query time limit", file, line);
297     }
298 }
299 
300 unittest
301 {
302     bool raised = false;
303 
304     try
305     {
306         auto client = new PostgresClient("wrong connect string", 2);
307     }
308     catch(ConnectionException e)
309         raised = true;
310 
311     assert(raised);
312 }
313 
314 version(IntegrationTest) void __integration_test(string connString)
315 {
316     setLogLevel = LogLevel.debugV;
317 
318     auto client = new PostgresClient(connString, 3);
319     auto conn = client.lockConnection();
320 
321     {
322         auto res = conn.execStatement(
323             "SELECT 123::integer, 567::integer, 'asd fgh'::text",
324             ValueFormat.BINARY
325         );
326 
327         assert(res.getAnswer[0][1].as!PGinteger == 567);
328     }
329 
330     {
331         conn.prepareStatement("stmnt_name", "SELECT 123::integer");
332 
333         bool throwFlag = false;
334 
335         try
336             conn.prepareStatement("wrong_stmnt", "WRONG SQL STATEMENT");
337         catch(ResponseException)
338             throwFlag = true;
339 
340         assert(throwFlag);
341     }
342 
343     {
344         import dpq2.oids: OidType;
345 
346         auto a = conn.describePreparedStatement("stmnt_name");
347 
348         assert(a.nParams == 0);
349         assert(a.OID(0) == OidType.Int4);
350     }
351 
352     {
353         QueryParams p;
354         p.preparedStatementName = "stmnt_name";
355 
356         auto r = conn.execPreparedStatement(p);
357 
358         assert(r.getAnswer[0][0].as!PGinteger == 123);
359     }
360 
361     {
362         // Fibers test
363         import vibe.core.concurrency;
364 
365         auto future0 = async({
366             auto conn = client.lockConnection;
367             immutable answer = conn.execStatement("SELECT 'New connection 0'");
368             destroy(conn);
369             return 1;
370         });
371 
372         auto future1 = async({
373             auto conn = client.lockConnection;
374             immutable answer = conn.execStatement("SELECT 'New connection 1'");
375             destroy(conn);
376             return 1;
377         });
378 
379         immutable answer = conn.execStatement("SELECT 'Old connection'");
380 
381         assert(future0 == 1);
382         assert(future1 == 1);
383         assert(answer.length == 1);
384     }
385 
386     {
387         assert(conn.escapeIdentifier("abc") == "\"abc\"");
388     }
389 
390     destroy(conn);
391 }