/* * This is the PostgreSQL direct network module for Pike. */ //! This is an interface to the PostgreSQL database //! server. This module is independent of any external libraries. //! Note that you @b{do not@} need to have a //! PostgreSQL server running on @b{your@} host to use this module: you can //! connect to the database over a TCP/IP socket on a different host. //! //! This module replaces the functionality of the older @[Sql.postgres] //! and @[Postgres.postgres] modules. //! //! This module supports the following features: //! @ul //! @item //! PostgreSQL network protocol version 3, authentication methods //! currently supported are: cleartext, md5 and scram (recommended). //! @item //! Optional asynchronous query interface through callbacks. //! @item //! Streaming queries which do not buffer the whole resultset in memory. //! @item //! Automatic binary transfers to and from the database for most common //! datatypes (amongst others: integer, text and bytea types). //! @item //! Automatic character set conversion and native wide string support. //! Supports UTF8/Unicode for multibyte characters, and all single-byte //! character sets supported by the database. //! @item //! SQL-injection protection by allowing just one statement per query //! and ignoring anything after the first (unquoted) semicolon in the query. //! @item //! COPY support for streaming up- and download. //! @item //! Accurate error messages. //! @item //! Automatic precompilation of complex queries (session cache). //! @item //! Multiple simultaneous queries on the same database connection. //! @item //! Cancelling of long running queries by force or by timeout. //! @item //! Event driven NOTIFY. //! @item //! SSL encrypted connections (optional or forced). //! @endul //! Check the PostgreSQL documentation for further details. //! //! @note //! Multiple simultaneous queries on the same database connection are a //! feature that none of the other database drivers for Pike support. //! So, although it's efficient, its use will make switching database drivers //! difficult. //! //! @seealso //! @[Sql.Sql], @[Sql.postgres], //! @url{https://www.postgresql.org/docs/current/static/@} #pike __REAL_VERSION__ #require constant(Thread.Thread) #include "pgsql.h" #define ERROR(X ...) predef::error(X) private .pgsql_util.proxy proxy; private int pstmtcount; private int ptstmtcount; // Periodically one would like to reset these // but checking when this is safe to do // probably is more costly than the gain #ifdef PG_STATS private int skippeddescribe; // Number of times we skipped Describe phase private int portalsopened; // Number of portals opened private int prepstmtused; // Number of times we used prepared statements #endif private int cachedepth = STATEMENTCACHEDEPTH; private int portalbuffersize = PORTALBUFFERSIZE; private int timeout = QUERYTIMEOUT; private array connparmcache; private int reconnected; private int lastping = time(1); private Thread.Condition resynced; private Thread.Mutex resyncmux; protected string _sprintf(int type) { string res; switch(type) { case 'O': res = sprintf(DRIVERNAME"(%s@%s:%d/%s,%d,%d)", proxy.user, proxy.host, proxy.port, proxy.database, proxy.c?->socket && proxy.c->socket->query_fd(), proxy.backendpid); break; } return res; } //! With no arguments, this function initialises a connection to the //! PostgreSQL backend. Since PostgreSQL requires a database to be //! selected, it will try to connect to the default database. The //! connection may fail however, for a variety of reasons; in this case //! the most likely reason is because you don't have sufficient privileges //! to connect to that database. So use of this particular syntax is //! discouraged. //! //! @param host //! Should either contain @expr{"hostname"@} or //! @expr{"hostname:portname"@}. This allows you to specify the TCP/IP //! port to connect to. If the parameter is @expr{0@} or @expr{""@}, //! it will try to connect to localhost, default port. //! //! @param database //! Specifies the database to connect to. Not specifying this is //! only supported if the PostgreSQL backend has a default database //! configured. If you do not want to connect to any live database, //! you can use @expr{"template1"@}. //! //! @param options //! Currently supports at least the following: //! @mapping //! @member int "reconnect" //! Set it to zero to disable automatic reconnects upon losing //! the connection to the database. Not setting it, or setting //! it to one, will cause one timed reconnect to take place. //! Setting it to -1 will cause the system to try and reconnect //! indefinitely. //! @member int "use_ssl" //! If the database supports and allows SSL connections, the session //! will be SSL encrypted, if not, the connection will fallback //! to plain unencrypted. //! @member int "force_ssl" //! If the database supports and allows SSL connections, the session //! will be SSL encrypted, if not, the connection will abort. //! @member int "text_query" //! Send queries to and retrieve results from the database using text //! instead of the, generally more efficient, default native binary method. //! Turning this on will allow multiple statements per query separated //! by semicolons (not recommended). //! @member int "sync_parse" //! Set it to zero to turn synchronous parsing off for statements. //! Setting this to off can cause surprises because statements could //! be parsed before the previous statements have been executed //! (e.g. references to temporary tables created in the preceding //! statement), //! but it can speed up parsing due to increased parallelism. //! @member int "cache_autoprepared_statements" //! If set to one, it enables the automatic statement prepare and //! cache logic; caching prepared statements can be problematic //! when stored procedures and tables are redefined which leave stale //! references in the already cached prepared statements. //! The default is off, because PostgreSQL 10.1 (at least) //! has a bug that makes it spike to 100% CPU sometimes when this is on. //! @member string "client_encoding" //! Character encoding for the client side, it defaults to using //! the default encoding specified by the database, e.g. //! @expr{"UTF8"@} or @expr{"SQL_ASCII"@}. //! @member string "standard_conforming_strings" //! When on, backslashes in strings must not be escaped any longer, //! @[quote()] automatically adjusts quoting strategy accordingly. //! @member string "escape_string_warning" //! When on, a warning is issued if a backslash (\) appears in an //! ordinary string literal and @expr{"standard_conforming_strings"@} //! is off, defaults to on. //! @endmapping //! For the numerous other options please check the PostgreSQL manual. //! //! @note //! You need to have a database selected before using the SQL-object, //! otherwise you'll get exceptions when you try to query it. Also //! notice that this function @b{can@} raise exceptions if the db //! server doesn't respond, if the database doesn't exist or is not //! accessible to you. //! //! @note //! It is possible that the exception from a failed connect //! will not be triggered on this call (because the connect //! proceeds asynchronously in the background), but on the first //! attempt to actually use the database instead. //! //! @seealso //! @[Postgres.postgres], @[Sql.Sql], @[select_db()], //! @url{https://www.postgresql.org/docs/current/static/runtime-config-client.html@} protected void create(void|string host, void|string database, void|string user, void|string pass, void|mapping(string:mixed) options) { string spass = pass && pass != "" ? Standards.IDNA.to_ascii(pass) : pass; if(pass) { String.secure(pass); pass = "CENSORED"; } connparmcache = ({ host, database, user && user != "" ? Standards.IDNA.to_ascii(user, 1) : user, spass, options || ([])}); proxy = .pgsql_util.proxy(@connparmcache); } //! @returns //! The textual description of the last //! server-related error. Returns @expr{0@} if no error has occurred //! yet. It is not cleared upon reading (can be invoked multiple //! times, will return the same result until a new error occurs). //! //! During the execution of a statement, this function accumulates all //! non-error messages (notices, warnings, etc.). If a statement does not //! generate any errors, this function will return all collected messages //! since the last statement. //! //! @note //! The string returned is not newline-terminated. //! //! @param clear //! To clear the error, set it to @expr{1@}. //! //! @seealso //! @[big_query()] /*semi*/final string error(void|int clear) { mixed err = catch(throwdelayederror(proxy)); if (!clear && err) throw(err); return proxy.geterror(clear); } //! This function returns a string describing what host are we talking to, //! and how (TCP/IP or UNIX socket). //! //! @seealso //! @[server_info()] /*semi*/final string host_info() { return proxy.host_info(); } //! Returns true if the connection seems to be open. //! //! @note //! This function only checks that there's an open connection, //! and that the other end hasn't closed it yet. No data is //! sent over the connection. //! //! For a more reliable check of whether the connection //! is alive, please use @[ping()] instead. //! //! @seealso //! @[ping()] /*semi*/final int is_open() { return proxy.is_open(); } //! Check whether the connection is alive. //! //! @returns //! Returns one of the following: //! @int //! @value 0 //! Everything ok. //! @value -1 //! The server has gone away, and the connection is dead. //! @endint //! //! @seealso //! @[is_open()] /*semi*/final int ping() { int t, ret; waitauthready(); if ((ret = is_open()) // Pinging more frequently than MINPINGINTERVAL seconds // is suppressed to avoid artificial TCP-ACK latency && (t = time(1)) - lastping > MINPINGINTERVAL && (ret = !catch(proxy.c->start()->sendcmd(FLUSHSEND)))) lastping = t; return ret ? !!reconnected : -1; } //! Cancels all currently running queries in this session. //! //! @seealso //! @[reload()], @[resync()] //! //! @note //! This function is PostgreSQL-specific. /*semi*/final void cancelquery() { proxy.cancelquery(); } //! Changes the connection charset. When set to @expr{"UTF8"@}, the query, //! parameters and results can be Pike-native wide strings. //! //! @param charset //! A PostgreSQL charset name. //! //! @seealso //! @[get_charset()], @[create()], //! @url{https://www.postgresql.org/docs/current/static/multibyte.html@} /*semi*/final void set_charset(string charset) { if(charset) textquery(sprintf("SET CLIENT_ENCODING TO '%s'", quote(charset))); } //! @returns //! The PostgreSQL name for the current connection charset. //! //! @seealso //! @[set_charset()], @[getruntimeparameters()], //! @url{https://www.postgresql.org/docs/current/static/multibyte.html@} /*semi*/final string get_charset() { return getruntimeparameters()[CLIENT_ENCODING]; } //! @returns //! Currently active runtimeparameters for //! the open session; these are initialised by the @tt{options@} parameter //! during session creation, and then processed and returned by the server. //! Common values are: //! @mapping //! @member string "client_encoding" //! Character encoding for the client side, e.g. //! @expr{"UTF8"@} or @expr{"SQL_ASCII"@}. //! @member string "server_encoding" //! Character encoding for the server side as determined when the //! database was created, e.g. @expr{"UTF8"@} or @expr{"SQL_ASCII"@}. //! @member string "DateStyle" //! Date parsing/display, e.g. @expr{"ISO, DMY"@}. //! @member string "TimeZone" //! Default timezone used by the database, e.g. @expr{"localtime"@}. //! @member string "standard_conforming_strings" //! When on, backslashes in strings must not be escaped any longer. //! @member string "session_authorization" //! Displays the authorisationrole which the current session runs under. //! @member string "is_superuser" //! Indicates if the current authorisationrole has database-superuser //! privileges. //! @member string "integer_datetimes" //! Reports wether the database supports 64-bit-integer dates and times. //! @member string "server_version" //! Shows the server version, e.g. @expr{"8.3.3"@}. //! @endmapping //! //! The values can be changed during a session using SET commands to the //! database. //! For other runtimeparameters check the PostgreSQL documentation. //! //! @seealso //! @url{https://www.postgresql.org/docs/current/static/runtime-config-client.html@} //! //! @note //! This function is PostgreSQL-specific. /*semi*/final mapping(string:string) getruntimeparameters() { waitauthready(); return proxy.runtimeparameter + ([]); } //! @returns //! A set of statistics for the current session: //! @mapping //! @member int "warnings_dropped" //! Number of warnings/notices generated by the database but not //! collected by the application by using @[error()] after the statements //! that generated them. //! @member int "skipped_describe_count" //! Number of times the driver skipped asking the database to //! describe the statement parameters because it was already cached. //! Only available if PG_STATS is compile-time enabled. //! @member int "used_prepared_statements" //! Number of times prepared statements were used from cache instead of //! reparsing in the current session. //! Only available if PG_STATS is compile-time enabled. //! @member int "current_prepared_statements" //! Cache size of currently prepared statements. //! @member int "current_prepared_statement_hits" //! Sum of the number hits on statements in the current statement cache. //! @member int "prepared_statement_count" //! Total number of prepared statements generated. //! @member int "portals_opened_count" //! Total number of portals opened, i.e. number of statements issued //! to the database. //! Only available if PG_STATS is compile-time enabled. //! @member int "bytes_received" //! Total number of bytes received from the database so far. //! @member int "messages_received" //! Total number of messages received from the database (one SQL-statement //! requires multiple messages to be exchanged). //! @member int "portals_in_flight" //! Currently still open portals, i.e. running statements. //! @endmapping //! //! @note //! This function is PostgreSQL-specific. /*semi*/final mapping(string:mixed) getstatistics() { mapping(string:mixed) stats = ([ "warnings_dropped":proxy.warningsdropcount, "current_prepared_statements":sizeof(proxy.prepareds), "current_prepared_statement_hits":proxy.totalhits, "prepared_statement_count":pstmtcount, #ifdef PG_STATS "used_prepared_statements":prepstmtused, "skipped_describe_count":skippeddescribe, "portals_opened_count":portalsopened, #endif "messages_received":proxy.msgsreceived, "bytes_received":proxy.bytesreceived, ]); return stats; } //! @param newdepth //! Sets the new cachedepth for automatic caching of prepared statements. //! //! @returns //! The previous cachedepth. //! //! @note //! This function is PostgreSQL-specific. /*semi*/final int setcachedepth(void|int newdepth) { int olddepth = cachedepth; if (!undefinedp(newdepth) && newdepth >= 0) cachedepth = newdepth; return olddepth; } //! @param newtimeout //! Sets the new timeout for long running queries. //! //! @returns //! The previous timeout. //! //! @note //! This function is PostgreSQL-specific. /*semi*/final int settimeout(void|int newtimeout) { int oldtimeout = timeout; if (!undefinedp(newtimeout) && newtimeout > 0) timeout = newtimeout; return oldtimeout; } //! @param newportalbuffersize //! Sets the new portalbuffersize for buffering partially concurrent queries. //! //! @returns //! The previous portalbuffersize. //! //! @note //! This function is PostgreSQL-specific. /*semi*/final int setportalbuffersize(void|int newportalbuffersize) { int oldportalbuffersize = portalbuffersize; if (!undefinedp(newportalbuffersize) && newportalbuffersize>0) portalbuffersize = newportalbuffersize; return oldportalbuffersize; } //! @param newfetchlimit //! Sets the new fetchlimit to interleave queries. //! //! @returns //! The previous fetchlimit. //! //! @note //! This function is PostgreSQL-specific. /*semi*/final int setfetchlimit(void|int newfetchlimit) { int oldfetchlimit = proxy._fetchlimit; if (!undefinedp(newfetchlimit) && newfetchlimit >= 0) proxy._fetchlimit = newfetchlimit; return oldfetchlimit; } private string glob2reg(string glob) { if (!glob || !sizeof(glob)) return "%"; return replace(glob, ({"*", "?", "\\", "%", "_"}), ({"%", "_", "\\\\", "\\%", "\\_"})); } private void waitauthready() { if (proxy.waitforauthready) { PD("%d Wait for auth ready %O\n", proxy.c?->socket && proxy.c->socket->query_fd(), backtrace()[-2]); Thread.MutexKey lock = proxy.shortmux->lock(); catch(PT(proxy.waitforauthready->wait(lock))); PD("%d Wait for auth ready released.\n", proxy.c?->socket && proxy.c->socket->query_fd()); } } //! Closes the connection to the database, any running queries are //! terminated instantly. //! //! @note //! This function is PostgreSQL-specific. /*semi*/final void close() { proxy.close(); } protected void destroy() { destruct(proxy); } //! For PostgreSQL this function performs the same function as @[resync()]. //! //! @seealso //! @[resync()], @[cancelquery()] /*semi*/final void reload() { resync(); } private .pgsql_util.sql_result textquery(string q) { return big_query(q, (["_text":1])); } private void resyncdone() { Thread.MutexKey lock = resyncmux->lock(); resynced.signal(); // Allow resync() to continue } private void reset_dbsession() { mixed err = catch { proxy.statementsinflight->wait_till_drained(); error(1); .pgsql_util.sql_result res = textquery("ROLLBACK;RESET ALL;CLOSE ALL;DISCARD TEMP"); while (res->fetch_row()); }; if (err && !proxy.delayederror) proxy.delayederror = err; resyncdone(); } private void resync_cb() { switch (proxy.backendstatus) { default: resyncdone(); break; case 'T':case 'E': foreach (proxy.prepareds; ; mapping tp) { m_delete(tp, "datatypeoid"); m_delete(tp, "datarowdesc"); m_delete(tp, "datarowtypes"); } Thread.Thread(reset_dbsession); // Urgently and deadlockfree } } //! Resyncs the database session; typically used to make sure the session is //! not still in a dangling transaction. //! //! If called while the connection is in idle state, the function is //! lightweight and briefly touches base with the database server to //! make sure client and server are in sync. //! //! If issued while inside a transaction, it will rollback the transaction, //! close all open cursors, drop all temporary tables and reset all //! session variables to their default values. //! //! @note //! This function @b{can@} raise exceptions. //! //! @seealso //! @[cancelquery()], @[reload()] //! //! @note //! This function is PostgreSQL-specific. /*semi*/final void resync() { mixed err; if (is_open()) { err = catch { PD("Statementsinflight: %d Portalsinflight: %d\n", proxy.statementsinflight, proxy.portalsinflight); if(!proxy.waitforauthready) { if (!resynced) { resynced = Thread.Condition(); resyncmux = Thread.Mutex(); } Thread.MutexKey lock = resyncmux->lock(); proxy.readyforquery_cb = resync_cb; proxy.sendsync(); resynced.wait(lock); // Wait for the db to finish } return; }; PD("%O\n", err); } if (proxy && proxy.lastmessage && sizeof(proxy.lastmessage)) ERROR(proxy.a2nls(proxy.lastmessage)); } //! Due to restrictions of the Postgres frontend-backend protocol, you always //! already have to be connected to a database. //! To connect to a different database you have to select the right //! database while connecting instead. This function is a no-op when //! specifying the same database, and throws an error otherwise. //! //! @seealso //! @[create()] /*semi*/final void select_db(string dbname) { if (proxy.database != dbname) ERROR("Cannot switch databases from %O to %O" " in an already established connection\n", proxy.database, dbname); } //! With PostgreSQL you can LISTEN to NOTIFY events. //! This function allows you to detect and handle such events. //! //! @param condition //! Name of the notification event we're listening //! to. A special case is the empty string, which matches all events, //! and can be used as fallback function which is called only when the //! specific condition is not handled. Another special case is //! @expr{"_lost"@} which gets called whenever the connection //! to the database unexpectedly drops. //! //! @param notify_cb //! Function to be called on receiving a notification-event of //! condition @ref{condition@}. //! The callback function is invoked with //! @expr{void notify_cb(pid,condition,extrainfo, .. args);@} //! @tt{pid@} is the process id of the database session that originated //! the event. @tt{condition@} contains the current condition. //! @tt{extrainfo@} contains optional extra information specified by //! the database. //! The rest of the arguments to @ref{notify_cb@} are passed //! verbatim from @ref{args@}. //! The callback function must return no value. //! //! @param selfnotify //! Normally notify events generated by your own session are ignored. //! If you want to receive those as well, set @ref{selfnotify@} to one. //! //! @param args //! Extra arguments to pass to @ref{notify_cb@}. //! //! @note //! This function is PostgreSQL-specific. /*semi*/final void set_notify_callback(string condition, void|function(int,string,string,mixed ...:void) notify_cb, void|int selfnotify, mixed ... args) { if (!notify_cb) m_delete(proxy.notifylist, condition); else { array old = proxy.notifylist[condition]; if (!old) old = ({notify_cb}); if (selfnotify || args) old += ({selfnotify}); if (args) old += args; proxy.notifylist[condition] = old; } } //! @returns //! The given string, but escapes/quotes all contained magic characters //! according to the quoting rules of the current session for non-binary //! arguments in textual SQL-queries. //! //! @note //! Quoting must not be done for parameters passed in bindings. //! //! @seealso //! @[big_query()], @[quotebinary()], @[create()] /*semi*/final string quote(string s) { waitauthready(); string r = proxy.runtimeparameter.standard_conforming_strings; if (r && r == "on") return replace(s, "'", "''"); return replace(s, ({ "'", "\\" }), ({ "''", "\\\\" }) ); } //! @returns //! The given string, but escapes/quotes all contained magic characters //! for binary (bytea) arguments in textual SQL-queries. //! //! @note //! Quoting must not be done for parameters passed in bindings. //! //! @seealso //! @[big_query()], @[quote()] //! //! @note //! This function is PostgreSQL-specific. /*semi*/final string quotebinary(string s) { return replace(s, ({ "'", "\\", "\0" }), ({ "''", "\\\\", "\\000" }) ); } //! This function creates a new database (assuming we //! have sufficient privileges to do this). //! //! @param db //! Name of the new database. //! //! @seealso //! @[drop_db()] /*semi*/final void create_db(string db) { textquery(sprintf("CREATE DATABASE %s", db)); } //! This function destroys a database and all the data it contains (assuming //! we have sufficient privileges to do so). It is not possible to delete //! the database you're currently connected to. You can connect to database //! @expr{"template1"@} to avoid connecting to any live database. //! //! @param db //! Name of the database to be deleted. //! //! @seealso //! @[create_db()] /*semi*/final void drop_db(string db) { textquery(sprintf("DROP DATABASE %s", db)); } //! @returns //! A string describing the server we are //! talking to. It has the form @expr{"servername/serverversion"@} //! (like the HTTP protocol description) and is most useful in //! conjunction with the generic SQL-server module. //! //! @seealso //! @[host_info()] /*semi*/final string server_info () { waitauthready(); return DRIVERNAME"/" + (proxy.runtimeparameter.server_version || "unknown"); } //! @returns //! An array of the databases available on the server. //! //! @param glob //! If specified, list only those databases matching it. /*semi*/final array(string) list_dbs (void|string glob) { array row, ret = ({}); .pgsql_util.sql_result res=big_query("SELECT d.datname " "FROM pg_database d " "WHERE d.datname ILIKE :glob " "ORDER BY d.datname", ([":glob":glob2reg(glob)])); while(row = res->fetch_row()) ret += ({row[0]}); return ret; } //! @returns //! An array containing the names of all the tables and views in the //! path in the currently selected database. //! //! @param glob //! If specified, list only the tables with matching names. /*semi*/final array(string) list_tables (void|string glob) { array row, ret = ({}); .pgsql_util.sql_result res = big_query( // due to missing schemasupport // This query might not work on PostgreSQL 7.4 "SELECT CASE WHEN 'public'=n.nspname THEN '' ELSE n.nspname||'.' END " " ||c.relname AS name " "FROM pg_catalog.pg_class c " " LEFT JOIN pg_catalog.pg_namespace n ON n.oid=c.relnamespace " "WHERE c.relkind IN ('r','v') AND n.nspname<>'pg_catalog' " " AND n.nspname !~ '^pg_toast' AND pg_catalog.pg_table_is_visible(c.oid) " " AND c.relname ILIKE :glob " " ORDER BY 1", ([":glob":glob2reg(glob)])); while(row = res->fetch_row()) ret += ({row[0]}); return ret; } //! @returns //! A mapping, indexed on the column name, of mappings describing //! the attributes of a table of the current database. //! The currently defined fields are: //! //! @mapping //! @member string "schema" //! Schema the table belongs to //! @member string "table" //! Name of the table //! @member string "kind" //! Type of table //! @member string "owner" //! Tableowner //! @member int "rowcount" //! Estimated rowcount of the table //! @member int "datasize" //! Estimated total datasize of the table in bytes //! @member int "indexsize" //! Estimated total indexsize of the table in bytes //! @member string "name" //! Name of the column //! @member string "type" //! A textual description of the internal (to the server) column type-name //! @member int "typeoid" //! The OID of the internal (to the server) column type //! @member string "length" //! Size of the columndatatype //! @member mixed "default" //! Default value for the column //! @member int "is_shared" //! @member int "has_index" //! If the table has any indices //! @member int "has_primarykey" //! If the table has a primary key //! @endmapping //! //! @param glob //! If specified, list only the tables with matching names. //! Setting it to @expr{*@} will include system columns in the list. /*semi*/final array(mapping(string:mixed)) list_fields(void|string table, void|string glob) { array row, ret = ({}); string schema; sscanf(table||"*", "%s.%s", schema, table); .pgsql_util.sql_result res = big_typed_query( "SELECT a.attname, a.atttypid, t.typname, a.attlen, " " c.relhasindex, c.relhaspkey, CAST(c.reltuples AS BIGINT) AS reltuples, " " (c.relpages " " +COALESCE( " " (SELECT SUM(tst.relpages) " " FROM pg_catalog.pg_class tst " " WHERE tst.relfilenode=c.reltoastrelid) " " ,0) " " )*8192::BIGINT AS datasize, " " (COALESCE( " " (SELECT SUM(pin.relpages) " " FROM pg_catalog.pg_index pi " " JOIN pg_catalog.pg_class pin ON pin.relfilenode=pi.indexrelid " " WHERE pi.indrelid IN (c.relfilenode,c.reltoastrelid)) " " ,0) " " )*8192::BIGINT AS indexsize, " " c.relisshared, t.typdefault, " " n.nspname, c.relname, " " CASE c.relkind " " WHEN 'r' THEN 'table' " " WHEN 'v' THEN 'view' " " WHEN 'i' THEN 'index' " " WHEN 'S' THEN 'sequence' " " WHEN 's' THEN 'special' " " WHEN 't' THEN 'toastable' " // pun intended :-) " WHEN 'c' THEN 'composite' " " ELSE c.relkind::TEXT END AS relkind, " " r.rolname " "FROM pg_catalog.pg_class c " " LEFT JOIN pg_catalog.pg_namespace n ON n.oid=c.relnamespace " " JOIN pg_catalog.pg_roles r ON r.oid=c.relowner " " JOIN pg_catalog.pg_attribute a ON c.oid=a.attrelid " " JOIN pg_catalog.pg_type t ON a.atttypid=t.oid " "WHERE c.relname ILIKE :table AND " " (n.nspname ILIKE :schema OR " " :schema IS NULL " " AND n.nspname<>'pg_catalog' AND n.nspname !~ '^pg_toast') " " AND a.attname ILIKE :glob " " AND (a.attnum>0 OR '*'=:realglob) " "ORDER BY n.nspname,c.relname,a.attnum,a.attname", ([":schema":glob2reg(schema),":table":glob2reg(table), ":glob":glob2reg(glob),":realglob":glob])); array colnames=res->fetch_fields(); { mapping(string:string) renames = ([ "attname":"name", "nspname":"schema", "relname":"table", "rolname":"owner", "typname":"type", "attlen":"length", "typdefault":"default", "relisshared":"is_shared", "atttypid":"typeoid", "relkind":"kind", "relhasindex":"has_index", "relhaspkey":"has_primarykey", "reltuples":"rowcount", ]); foreach(colnames; int i; mapping m) { string nf, field=m.name; if(nf=renames[field]) field=nf; colnames[i]=field; } } #define delifzero(m, field) if(!(m)[field]) m_delete(m, field) while(row=res->fetch_row()) { mapping m=mkmapping(colnames, row); delifzero(m,"is_shared"); delifzero(m,"has_index"); delifzero(m,"has_primarykey"); delifzero(m,"default"); ret += ({m}); } return ret; } private string trbackendst(int c) { switch(c) { case 'I': return "idle"; case 'T': return "intransaction"; case 'E': return "infailedtransaction"; } return ""; } //! @returns //! The current commitstatus of the connection. Returns either one of: //! @string //! @value idle //! @value intransaction //! @value infailedtransaction //! @endstring //! //! @note //! This function is PostgreSQL-specific. final string status_commit() { return trbackendst(proxy.backendstatus); } private inline void closestatement( .pgsql_util.bufcon|.pgsql_util.conxsess plugbuffer, string oldprep) { .pgsql_util.closestatement(plugbuffer, oldprep); } private inline string int2hex(int i) { return String.int2hex(i); } private inline void throwdelayederror(object parent) { .pgsql_util.throwdelayederror(parent); } private void startquery(int forcetext, .pgsql_util.sql_result portal, string q, mapping(string:mixed) tp, string preparedname) { .pgsql_util.conxion c = proxy.c; if (!c && (proxy.options["reconnect"] || zero_type(proxy.options["reconnect"]))) { sleep(BACKOFFDELAY); // Force a backoff delay if (!proxy.c) { reconnected++; proxy = .pgsql_util.proxy(@connparmcache); } c = proxy.c; } if (forcetext) { // FIXME What happens if portals are still open? portal._unnamedportalkey = proxy.unnamedportalmux->lock(1); portal._portalname = ""; portal->_parseportal(); portal->_bindportal(); proxy.readyforquerycount++; { Thread.MutexKey lock = proxy.unnamedstatement->lock(1); .pgsql_util.conxsess cs = c->start(1); CHAIN(cs)->add_int8('Q')->add_hstring(({q, 0}), 4, 4); cs->sendcmd(FLUSHLOGSEND, portal); } PD("Simple query: %O\n", q); } else { object plugbuffer; portal->_parseportal(); if (!sizeof(preparedname) || !tp || !tp.preparedname) { if (!sizeof(preparedname)) preparedname = (portal._unnamedstatementkey = (proxy.options.cache_autoprepared_statements ? proxy.unnamedstatement->trylock : proxy.unnamedstatement->lock)(1)) ? "" : PTSTMTPREFIX + int2hex(ptstmtcount++); PD("Parse statement %O=%O\n", preparedname, q); plugbuffer = c->start(); CHAIN(plugbuffer)->add_int8('P') ->add_hstring(({preparedname, 0, q, "\0\0\0"}), 4, 4) #if 0 // Even though the protocol doesn't require the Parse command to be // followed by a flush, it makes a VERY noticeable difference in // performance if it is omitted; seems like a flaw in the PostgreSQL // server v8.3.3 // In v8.4 and later, things speed up slightly when it is omitted. ->add(PGFLUSH) #endif ; } else { // Use the name from the cache preparedname = tp.preparedname; // to shortcut a potential race PD("Using prepared statement %s for %O\n", preparedname, q); } portal._preparedname = preparedname; if (!tp || !tp.datatypeoid) { PD("Describe statement %O\n", preparedname); if (!plugbuffer) plugbuffer = c->start(); CHAIN(plugbuffer)->add_int8('D') ->add_hstring(({'S', preparedname, 0}), 4, 4); plugbuffer->sendcmd(FLUSHSEND, portal); } else { if (plugbuffer) plugbuffer->sendcmd(KEEP); #ifdef PG_STATS skippeddescribe++; #endif portal->_setrowdesc(tp.datarowdesc, tp.datarowtypes); } if ((portal._tprepared=tp) && tp.datatypeoid) { mixed e = catch(portal->_preparebind(tp.datatypeoid)); if (!this) // Already destructed? throw(e); if (e) { portal->_purgeportal(); if (!portal.delayederror) throw(e); } } if (!proxy.unnamedstatement) portal._unnamedstatementkey = 0; // Cover for a destruct race } } //! This is the only provided direct interface which allows you to query the //! database. A simpler synchronous interface can be used through @[query()]. //! //! Bindings are supported natively straight across the network. //! Special bindings supported are: //! @mapping //! @member int ":_cache" //! Forces caching on or off for the query at hand. //! @member int ":_text" //! Forces text mode in communication with the database for queries on or off //! for the query at hand. Potentially more efficient than the default //! binary method for simple queries with small or no result sets. //! Note that this mode causes all but the first query result of a list //! of semicolon separated statements to be discarded. //! @member int ":_sync" //! Forces synchronous parsing on or off for statements. //! Setting this to off can cause surprises because statements could //! be parsed before the previous statements have been executed //! (e.g. references to temporary tables created in the preceding //! statement), //! but it can speed up parsing due to increased parallelism. //! @endmapping //! //! @note //! The parameters referenced via the @expr{bindings@}-parameter-mapping //! passed to this function must remain unaltered //! until the parameters have been sent to the database. The driver //! currently does not expose this moment, but to avoid a race condition //! it is sufficient to keep them unaltered until the first resultrow //! has been fetched (or EOF is reached, in case of no resultrows). //! //! @returns //! A @[Sql.pgsql_util.sql_result] object (which conforms to the //! @[Sql.sql_result] standard interface for accessing data). It is //! recommended to use @[query()] for simpler queries (because //! it is easier to handle, but stores all the result in memory), and //! @[big_query()] for queries you expect to return huge amounts of //! data (it's harder to handle, but fetches results on demand). //! //! @note //! This function @b{can@} raise exceptions. //! //! @note //! This function supports multiple simultaneous queries (portals) on a single //! database connection. This is a feature not commonly supported by other //! database backends. //! //! @note //! This function, by default, does not support multiple queries in one //! querystring. //! I.e. it allows for but does not require a trailing semicolon, but it //! simply ignores any commands after the first unquoted semicolon. This can //! be viewed as a limited protection against SQL-injection attacks. //! To make it support multiple queries in one querystring, use the //! @ref{:_text@} option (not recommended). //! //! @seealso //! @[big_typed_query()], @[Sql.Sql], @[Sql.sql_result], //! @[query()], @[Sql.pgsql_util.sql_result] /*semi*/final .pgsql_util.sql_result big_query(string q, void|mapping(string|int:mixed) bindings, void|int _alltyped) { string preparedname = ""; mapping(string:mixed) options = proxy.options; .pgsql_util.conxion c = proxy.c; int forcecache = -1, forcetext = options.text_query; int syncparse = zero_type(options.sync_parse) ? -1 : options.sync_parse; if (proxy.waitforauthready) waitauthready(); string cenc = proxy.runtimeparameter[CLIENT_ENCODING]; switch(cenc) { case UTF8CHARSET: q = string_to_utf8(q); break; default: if (String.width(q) > 8) ERROR("Don't know how to convert %O to %s encoding\n", q, cenc); } array(string|int) paramValues; array from; if (bindings) { if (forcetext) q = .sql_util.emulate_bindings(q, bindings, this), paramValues = ({}); else { int pi = 0; paramValues = allocate(sizeof(bindings)); from = allocate(sizeof(bindings)); array(string) litfrom, litto, to = allocate(sizeof(bindings)); litfrom = ({}); litto = ({}); foreach (bindings; mixed name; mixed value) { if (stringp(name)) { // Throws if mapping key is empty string if (name[0] != ':') name = ":" + name; if (name[1] == '_') { // Special option parameter switch(name) { case ":_cache": forcecache = (int)value; break; case ":_text": forcetext = (int)value; break; case ":_sync": syncparse = (int)value; break; } continue; } if (!has_value(q, name)) continue; } if (multisetp(value)) { // multisets are taken literally litto += ({indices(value)*","}); // and bypass the encoding logic litfrom += ({name}); } else { paramValues[pi] = value; to[pi] = sprintf("$%d", pi + 1); from[pi++] = name; } } if (pi--) { paramValues = paramValues[.. pi]; q = replace(q, litfrom += from = from[.. pi], litto += to = to[.. pi]); } else { paramValues = ({}); if (sizeof(litfrom)) q = replace(q, litfrom, litto); } from = ({from, to, paramValues}); } } else paramValues = ({}); if (String.width(q) > 8) ERROR("Wide string literals in %O not supported\n", q); if (has_value(q, "\0")) ERROR("Querystring %O contains invalid literal nul-characters\n", q); mapping(string:mixed) tp; /* * FIXME What happens with regards to this detection when presented with * multistatement text-queries? * The primary function of this detection is to ensure a SYNC * right after a COMMIT, and no SYNC after a BEGIN. */ int transtype = .pgsql_util.transendprefix->match(q) ? TRANSEND : .pgsql_util.transbeginprefix->match(q) ? TRANSBEGIN : NOTRANS; { int inerror = proxy.backendstatus == 'E'; if (inerror && transtype == TRANSEND) proxy.sendsync(); throwdelayederror(proxy); if (inerror && transtype != TRANSEND) ERROR("Current transaction is aborted, " "commands ignored until end of transaction"); } if (transtype != NOTRANS || .pgsql_util.nodataresprefix->match(q)) tp = .pgsql_util.describenodata; // Description already known else if (!forcetext && forcecache == 1 || forcecache && sizeof(q) >= MINPREPARELENGTH) { object plugbuffer = c->start(); if (tp = proxy.prepareds[q]) { if (tp.preparedname) { #ifdef PG_STATS prepstmtused++; #endif preparedname = tp.preparedname; } else if(tp.trun && tp.tparse*FACTORPLAN >= tp.trun && options.cache_autoprepared_statements) preparedname = PREPSTMTPREFIX + int2hex(pstmtcount++); } else { if (proxy.totalhits >= cachedepth) foreach (proxy.prepareds; string ind; tp) { int oldhits = tp.hits; proxy.totalhits -= oldhits-(tp.hits = oldhits >> 1); if (oldhits <= 1) { closestatement(plugbuffer, tp.preparedname); m_delete(proxy.prepareds, ind); } } if (forcecache != 1 && .pgsql_util.createprefix->match(q)) { PD("Invalidate cache\n"); proxy.invalidatecache = 1; // Flush cache on CREATE tp = 0; } else proxy.prepareds[q] = tp = ([]); } if (proxy.invalidatecache) { proxy.invalidatecache = 0; foreach (proxy.prepareds; ; mapping np) { closestatement(plugbuffer, np.preparedname); m_delete(np, "preparedname"); } } if (sizeof(CHAIN(plugbuffer))) { PD("%O\n", (string)CHAIN(plugbuffer)); plugbuffer->sendcmd(FLUSHSEND); // close expireds } else plugbuffer->sendcmd(KEEP); // close start() } else // sql_result autoassigns to portal tp = 0; .pgsql_util.sql_result portal; portal = .pgsql_util.sql_result(proxy, c, q, portalbuffersize, _alltyped, from, forcetext, timeout, syncparse, transtype); portal._tprepared = tp; #ifdef PG_STATS portalsopened++; #endif proxy.clearmessage = 1; // Do not run a query in the local_backend to prevent deadlocks if (Thread.this_thread() == .pgsql_util.local_backend.executing_thread()) Thread.Thread(startquery, forcetext, portal, q, tp, preparedname); else startquery(forcetext, portal, q, tp, preparedname); if (portal) // Catches race where portal already imploded throwdelayederror(portal); return portal; } //! This is an alias for @[big_query()], since @[big_query()] already supports //! streaming of multiple simultaneous queries through the same connection. //! //! @seealso //! @[big_query()], @[big_typed_query()], @[Sql.Sql], @[Sql.sql_result] /*semi*/final inline .pgsql_util.sql_result streaming_query(string q, void|mapping(string|int:mixed) bindings) { return big_query(q, bindings); } //! This function returns an object that allows streaming and typed //! results. //! //! @seealso //! @[big_query()], @[Sql.Sql], @[Sql.sql_result] /*semi*/final inline .pgsql_util.sql_result big_typed_query(string q, void|mapping(string|int:mixed) bindings) { return big_query(q, bindings, 1); }