Skip to content
Snippets Groups Projects
Select Git revision
  • 6ddc7f1f79376e5f98d7778e7592a38acda37630
  • master default protected
  • 9.0
  • 8.0
  • nt-tools
  • 7.8
  • 7.6
  • 7.4
  • 7.2
  • 7.0
  • 0.6
  • rosuav/latex-markdown-renderer
  • rxnpatch/rxnpatch
  • marcus/gobject-introspection
  • rxnpatch/8.0
  • rosuav/pre-listening-ports
  • rosuav/async-annotations
  • rosuav/pgsql-ssl
  • rxnpatch/rxnpatch-broken/2023-10-06T094250
  • grubba/fdlib
  • grubba/wip/sakura/8.0
  • v8.0.2020
  • v8.0.2018
  • v8.0.2016
  • v8.0.2014
  • v8.0.2012
  • v8.0.2008
  • v8.0.2006
  • v8.0.2004
  • v8.0.2002
  • v8.0.2000
  • v8.0.1998
  • v8.0.1996
  • v8.0.1994
  • v8.0.1992
  • v8.0.1990
  • v8.0.1988
  • v8.0.1986
  • rxnpatch/clusters/8.0/2025-04-29T124414
  • rxnpatch/2025-04-29T124414
  • v8.0.1984
41 results

SqlTable.pike

Blame
  • SqlTable.pike 59.25 KiB
    #pike __REAL_VERSION__
    #if !constant (___Mysql)
    constant this_program_does_not_exist = 1;
    #else  // !___Mysql
    
    //! This class provides some abstractions on top of an SQL table.
    //!
    //! At the core it is generic for any SQL database, but the current
    //! implementation is MySQL specific on some points, notably the
    //! semantics of AUTO_INCREMENT, the quoting method, knowledge about
    //! column types, and some conversion functions. Hence the location in
    //! the @[Mysql] module.
    //!
    //! Among other things, this class handles some convenient conversions
    //! between SQL and pike data types:
    //!
    //! @ul
    //! @item
    //!   Similar to @[Sql.big_typed_query], SQL integer and floating
    //!   point columns are converted to/from pike ints and floats, and
    //!   SQL NULLs are converted to/from the @[Val.null] object.
    //!
    //!   MySQL DECIMAL columns are converted to/from @[Gmp.mpq] objects
    //!   if they have one or more decimal places, otherwise they are
    //!   converted to/from ints.
    //!
    //! @item
    //!   MySQL TIMESTAMP columns are converted to/from pike ints
    //!   containing unix timestamps. This conversion is done on the MySQL
    //!   side using the UNIX_TIMESTAMP and FROM_UNIXTIME functions, which
    //!   means that the conversion is not susceptible to offsets due to
    //!   time zone differences etc. There is however one special case
    //!   here that MySQL doesn't handle cleanly - see note below.
    //!
    //! @item
    //!   Other SQL types are kept in string form. That includes DATE,
    //!   TIME, and DATETIME, which are returned as MySQL formats them.
    //!
    //!   Note that @[Sql.mysql] can handle conversions to/from Unicode
    //!   strings for text data types. If that is enabled then this class
    //!   also supports that conversion.
    //!
    //! @item
    //!   There are debug checks (with the DEBUG define) that verify the
    //!   incoming pike types, to avoid bugs which could otherwise be
    //!   hidden by implicit casts on the SQL side. The date and time
    //!   types (except TIMESTAMP) can be sent either as strings or
    //!   integers (e.g. either "2010-01-01" or 20100101).
    //! @endul
    //!
    //! This class can also optionally simulate an arbitrary set of fields
    //! in each table row: If a field name is the same as a column then
    //! the column is accessed, otherwise it accesses an entry in a
    //! mapping stored in a special BLOB column which is usually called
    //! "properties".
    //!
    //! @note
    //! Although SQL is case insensitive on column names, this class
    //! isn't.
    //!
    //! @note
    //! The generated SQL queries always quote table and column names
    //! according to MySQL syntax using backticks (`). However, literal
    //! backticks in names are not quoted and might therefore lead to SQL
    //! syntax errors. This might change if it becomes a problem.
    //!
    //! @note
    //! The handling of TIMESTAMP columns in MySQL (as of 5.1 at least)
    //! through UNIX_TIMESTAMP and FROM_UNIXTIME has one problem if the
    //! active time zone uses daylight savings time:
    //!
    //! Apparently FROM_UNIXTIME internally formats the integer to a MySQL
    //! date/time string, which is then parsed again to set the unix
    //! timestamp in the TIMESTAMP column. The formatting and the parsing
    //! uses the same time zone, so the conversions generally cancel
    //! themselves out. However, there is one exception with the 1 hour
    //! overlap in the fall when going from summer time to normal time.
    //!
    //! E.g. if the active time zone on the connection is Central European
    //! Time, which uses DST, then setting 1130630400 (Sun 30 Oct 2005
    //! 2:00:00 CEST) through "INSERT INTO foo SET ts =
    //! FROM_UNIXTIME(1130630400)" actually sets the ts column to
    //! 1130634000 (Sun 30 Oct 2005 2:00:00 CET).
    //!
    //! The only way around that problem is apparently to ensure that the
    //! time zone used on the connection is one which doesn't use DST.
    //! E.g. UTC is a reasonable choice, which can be set on the
    //! connection through "SET time_zone = '+00:00'". That is not done
    //! automatically by this class.
    
    #ifdef SQLTOOLS_UPDATE_DEBUG
    #define UPDATE_MSG(X...) werror (X)
    #else
    #define UPDATE_MSG(X...) 0
    #endif
    
    // Ideas for the future:
    // o  Generalize to support different db backends.
    // o  Add hooks to make it possible to cache the record mappings.
    // o  Make it possible to replace the encode/decode functions for the
    //    properties column.
    // o  Make it possible to specify custom conversion functions for
    //    individual fields.
    // o  Add optional caching of compiled queries.
    
    function(void:Sql.Sql) get_db;
    //! Callback to get a database connection.
    
    string table;
    //! The table to query or change. Do not change.
    
    string id_col;
    //! The column containing the AUTO_INCREMENT values (if any).
    
    array(string) pk_cols;
    //! The column(s) containing the primary key, in order. Typically it
    //! is the same as @expr{({@[id_col]})@}.
    
    string prop_col;
    //! The column containing miscellaneous properties. May be zero if
    //! this feature is disabled. Do not change.
    
    int prop_col_max_length;
    //! Maximum length of the value @[prop_col] can hold. Only applicable
    //! if @[prop_col] is set. Do not change.
    
    mapping(string:string) col_types;
    //! Maps the names of the table columns to the types @[SqlTable] will
    //! handle them as. This is queried from the database in @[create]. Do
    //! not change.
    
    protected mapping(string:int(1..1)) timestamp_cols;
    // Mapping tracking all TIMESTAMP columns.
    
    protected mapping(string:int(1..1)) datetime_cols;
    // Mapping tracking all date and time columns (except TIMESTAMP),
    // which can be sent either as strings or integers.
    
    protected string query_charset;
    // Mysql charset to use in queries. This is dictated by the table and
    // column names. If none of them are wide, we always use latin1 to
    // avoid the encoding overhead in Sql.mysql (even though that is not
    // strictly correct - mysql latin1 have some different chars in the
    // high control char range of iso-8859-1, but we ignore that). If wide
    // identifiers are found, this is zero to let Sql.mysql fix the
    // charsets, and it better be in unicode encode mode then. This does
    // not affect string constants since we always specify the charset for
    // every string literal.
    
    // Converts mysql types to pike types for col_types. All types not
    // mentioned here become "string".
    protected constant mysql_types_map = ([
      "float": "float", "double": "float",
      "char": "int", "short": "int",
      "int24": "int", "long": "int", "longlong": "int",
      "timestamp": "int", "bit": "int",
      "decimal": "float", "newdecimal": "float",
    ]);
    
    // Mysql decimal types. These are represented as integers if they have
    // no fraction part, otherwise as Gmp.mpq objects.
    protected constant mysql_decimal_types = ([
      "decimal": 1, "newdecimal": 1,
    ]);
    
    protected constant mysql_datetime_types = ([
      "datetime": 1, "date": 1, "time": 1, "year": 1,
    ]);
    
    // FIXME: Add a setting to be able to "deprecate" columns for
    // migration from a column to a field in the properties blob. The
    // deprecation means the column will be queried, but the value will be
    // put into the properties mapping on update.
    
    // FIXME: Conversely, add a setting to deprecate properties be able to
    // migrate them to columns. Normally it solves itself since properties
    // have priority over columns, but select() and get() need to be aware
    // of such columns so they don't incorrectly skip prop_col altogether
    // in the query.
    
    protected void create (function(void:Sql.Sql) get_db,
    		       string table,
    		       void|string prop_col)
    //! Creates an @[SqlTable] object for accessing (primarily) a specific
    //! table.
    //!
    //! @param get_db
    //! A function that will be called to get a connection to the database
    //! containing the table.
    //!
    //! @param table
    //! The name of the table.
    //!
    //! @param prop_col
    //! The column in which all fields which don't have explicit columns
    //! are stored. It has to be a non-null blob or varbinary column. If
    //! this isn't specified and there is such a column called
    //! "properties" then it is used for this purpose. Set to @expr{"-"@}
    //! to force this feature to be disabled.
    {
      this_program::get_db = get_db;
      this_program::table = table;
    
      Sql.Sql conn = get_db();
    
      query_charset = String.width (table) == 8 && "latin1";
    
      {
        col_types = ([]);
        timestamp_cols = ([]);
        datetime_cols = ([]);
        array(mapping(string:mixed)) col_list =
          conn->list_fields (string_to_utf8 (table));
        mapping(string:mixed) prop_col_info;
    
        foreach (col_list, mapping(string:mixed) col_info) {
          string name = utf8_to_string (col_info->name);
          if (String.width (name) > 8) query_charset = 0;
          if (col_types[name])
    	error ("Strange duplicate column %O in %O\n", name, col_list);
          if (mysql_decimal_types[col_info->type] && !col_info->decimals)
    	col_types[name] = "int";
          else
    	col_types[name] = mysql_types_map[col_info->type] || "string";
          if (col_info->type == "timestamp") timestamp_cols[name] = 1;
          if (mysql_datetime_types[col_info->type]) datetime_cols[name] = 1;
          if (name == (prop_col || "properties"))
    	prop_col_info = col_info;
        }
    
        if (prop_col != "-") {
          if (prop_col_info)
    	if (!prop_col_info->flags->binary ||
    	    !prop_col_info->flags->not_null ||
    	    !(<"var string", "blob">)[prop_col_info->type])
    	  prop_col_info = 0;
          if (prop_col && !prop_col_info)
    	error ("Table doesn't have a non-null binary column %O "
    	       "to store extra fields in.\n", prop_col);
          if (prop_col_info) {
    	if (!prop_col_info->length)
    	  error ("Unable to determine maximum length of the property "
    		 "column %O. Got column info: %O\n", prop_col, prop_col_info);
    	this_program::prop_col = prop_col_info->name;
    	this_program::prop_col_max_length = prop_col_info->length;
          }
        }
      }
    
      foreach (conn->query ("SHOW COLUMNS FROM `" + table + "`"),
    	   mapping(string:string) col_info)
        if (col_info->Extra == "auto_increment") {
          id_col = col_info->Field;
          break;
        }
    
      {
        mapping(int:string) pkc = ([]);
        foreach (conn->query ("SHOW INDEX FROM `" + table + "`"),
    	     mapping(string:string) ind_col)
          if (ind_col->Key_name == "PRIMARY")
    	pkc[(int) ind_col->Seq_in_index] = ind_col->Column_name;
        pk_cols = values (pkc);
        sort (indices (pkc), pk_cols);
      }
    }
    
    protected string _sprintf (int flag)
    {
      return flag == 'O' &&
        sprintf ("%s(%O)",
    	     function_name (object_program (this)) || "SqlTable",
    	     table);
    }
    
    local string quote (string s)
    //! Quotes a string literal for inclusion in an SQL statement, e.g. in
    //! a WHERE clause to @[select].
    //!
    //! @note
    //! Most functions here take raw string literals. Quoting is seldom
    //! necessary.
    {
      // Hardwired mysql quoting since we don't support anything else
      // (yet). The original ought to be in a module.. :P
      return predef::replace(s,
    			 ({ "\\", "\"", "\0", "\'", "\n", "\r" }),
    			 ({ "\\\\", "\\\"", "\\0", "\\\'", "\\n", "\\r" }));
    }
    
    protected int argspec_counter;
    
    local string handle_argspec (array argspec, mapping(string:mixed) bindings)
    //! Helper function for use with array style arguments.
    //!
    //! Many functions in this class can take WHERE expressions etc either
    //! as plain strings or as arrays. In array form, they work like when
    //! @[Sql.Sql.query] is called with more than one argument:
    //!
    //! The first element in the array is a string containing the SQL
    //! snippet. If the second element is a mapping, it's taken as a
    //! bindings mapping, otherwise the remaining elements are formatted
    //! using the first in @expr{sprintf@} fashion. See @[Sql.Sql.query]
    //! for further details.
    //!
    //! This function reduces an argument on array form to a simple
    //! string, combined with bindings. @[bindings] is a mapping that is
    //! modified to contain the new bindings.
    //!
    //! @note
    //! The @[quote] function can be used to quote string literals in the
    //! query, to avoid the array format.
    //!
    //! @returns
    //! Return the SQL snippet in string form, possibly with variable
    //! bindings referring to @[bindings].
    {
      if (sizeof (argspec) == 2 && mappingp (argspec[1])) {
        foreach (argspec[1]; string|int var; mixed val)
          bindings[var] = val;
        return argspec[0];
      }
      else
        return Sql.sql_util.handle_extraargs (argspec[0], argspec[1..],
    					  bindings)[0];
    }
    
    // Implicit connection handling.
    
    int insert (mapping(string:mixed)... records)
    //! Inserts one or more records into the table using an INSERT
    //! command. An SQL error is thrown if a record conflicts with an
    //! existing one.
    //!
    //! A record is represented as a mapping with one entry for each
    //! column or property (if @[prop_col] is used). The values must be of
    //! the right type for the column: Integers for integer columns,
    //! floats for floating point columns, strings for all other data
    //! types, and @[Val.null] for the SQL NULL value.
    //!
    //! If the property feature is used (i.e. if @[prop_col] is set) then
    //! any entries in the record mapping that don't match a column are
    //! treated as properties and are stored encoded in the @[prop_col]
    //! column. Note that column names are matched with case sensitivity.
    //! Properties may store any pike data type (as long as it is accepted
    //! by @[encode_value_canonic]).
    //!
    //! If @[id_col] is set and that column doesn't exist in a record
    //! mapping then the field is added to the mapping with the value that
    //! the record got. This currently only works for the first record
    //! mapping if there are several.
    //!
    //! @returns
    //! The value of the @[id_col] column for the new record. If several
    //! records are inserted at once then the value for the first one is
    //! returned. Zero is returned if there is no @[id_col] column.
    //!
    //! @seealso
    //! @[insert_ignore], @[replace], @[insert_or_update]
    {
      return conn_insert (get_db(), @records);
    }
    
    int insert_ignore (mapping(string:mixed)... records)
    //! Inserts one or more records into the table using an INSERT IGNORE
    //! command. If some of the given records conflict with existing
    //! records then they are ignored.
    //!
    //! Zero is returned if all records were ignored. The record mapping
    //! is updated with the @[id_col] record id only if a single record is
    //! given. Otherwise this function behaves like @[insert].
    //!
    //! @seealso
    //! @[insert], @[replace]
    {
      return conn_insert_ignore (get_db(), @records);
    }
    
    int replace (mapping(string:mixed)... records)
    //! Inserts one or more records into the table using a REPLACE
    //! command. If some of the given records conflict with existing
    //! records then they are replaced.
    //!
    //! Otherwise this function behaves like @[insert].
    //!
    //! @seealso
    //! @[insert], @[insert_ignore]
    {
      return conn_replace (get_db(), @records);
    }
    
    void update (mapping(string:mixed) record,
    	     void|int(0..2) clear_other_fields)
    //! Updates an existing record. This requires a primary key and that
    //! @[record] contains values for all primary key columns. If
    //! @[record] doesn't correspond to any existing record then nothing
    //! happens.
    //!
    //! Updating a record normally means that all fields in @[record]
    //! override those stored in the table row, while all other fields
    //! keep their values.
    //!
    //! It's the same for properties (i.e. fields that don't correspond to
    //! columns) which are stored in the @[prop_col] column. If that
    //! column needs to be updated then by default the old value is
    //! fetched first, which means the update isn't atomic in that case. A
    //! property can be removed altogether by giving it the value
    //! @[Val.null] in @[record].
    //!
    //! If @[clear_other_fields] is 1 then all old properties are replaced
    //! by the new ones instead of merged with them, which avoids the
    //! extra fetch. If @[clear_other_fields] is 2 then additionally all
    //! unmentioned columns are reset to their default values.
    //!
    //! For more details about the @[record] mapping, see @[insert].
    //!
    //! @seealso
    //! @[insert_or_update]
    {
      conn_update (get_db(), record, clear_other_fields);
    }
    
    int insert_or_update (mapping(string:mixed) record,
    		      void|int(0..2) clear_other_fields)
    //! Insert a record into the table using an INSERT ... ON DUPLICATE
    //! KEY UPDATE command: In case @[record] conflicts with an existing
    //! record then it is updated like the @[update] function would do,
    //! otherwise it is inserted like @[insert] would do.
    //!
    //! If @[id_col] is set and that column doesn't exist in @[record]
    //! then the field is added to the mapping with the value that the
    //! inserted or updated record got.
    //!
    //! @returns
    //! The value of the @[id_col] column for the new or updated record.
    //! Zero is returned if there is no @[id_col] column.
    //!
    //! @note
    //! This function isn't atomic if @[clear_other_fields] is unset and
    //! @[record] contains fields which do not correspond to real columns,
    //! i.e. if the @[prop_col] column may need to be updated.
    {
      return conn_insert_or_update (get_db(), record, clear_other_fields);
    }
    
    void delete (string|array where, void|string|array rest)
    //! Deletes records from the table that matches a condition.
    //!
    //! Both @[where] and @[rest] may be given as arrays to use bindings
    //! or @expr{sprintf@}-style formatting - see @[handle_argspec] for
    //! details.
    //!
    //! @param where
    //! The match condition, on the form of a WHERE expression.
    //!
    //! A WHERE clause will always be generated, but you can put e.g.
    //! "TRUE" in the match condition to select all records.
    //!
    //! @param rest
    //! Optional clauses that follows after the WHERE clause in a DELETE,
    //! i.e. ORDER BY and/or LIMIT.
    //!
    //! @seealso
    //! @[remove]
    //!
    //! @fixme
    //! Add support for joins.
    {
      conn_delete (get_db(), where, rest);
    }
    
    void remove (mixed id)
    //! Removes the record matched by the primary key value in @[id].
    //! Nothing happens if there is no such record.
    //!
    //! If the table has a multicolumn primary key then @[id] must be an
    //! array which has the values for the primary key columns in the same
    //! order as @[pk_cols]. Otherwise @[id] is taken directly as the
    //! value of the single primary key column.
    //!
    //! @seealso
    //! @[remove_multi], @[delete]
    {
      conn_remove (get_db(), id);
    }
    
    void remove_multi (array(mixed) ids)
    //! Removes multiple records selected by primary key values. It is not
    //! an error if some of the @[ids] elements don't match any records.
    //!
    //! This function currently only works if the primary key is a single
    //! column.
    //!
    //! @seealso
    //! @[remove]
    {
      conn_remove_multi (get_db(), ids);
    }
    
    Result select (string|array where, void|array(string) fields,
    	       void|string|array select_exprs, void|string table_refs,
    	       void|string|array rest, void|string select_flags)
    //! Retrieves all records that matches a condition.
    //!
    //! This function sends a SELECT statement, and it gives full
    //! expressive power to send any SELECT that is based on this table.
    //!
    //! The only functionality this function adds over
    //! @[Sql.big_typed_query] is conversion of TIMESTAMP values (see the
    //! class doc), and the (optional) handling of arbitrary properties in
    //! addition to the SQL columns. @[fields] may list such properties,
    //! and they are returned alongside the real columns. Properties
    //! cannot be used in WHERE expressions etc, though.
    //!
    //! Joins with other tables are possible through @[table_refs], but
    //! property columns in those tables aren't decoded.
    //!
    //! @param where
    //! The match condition, on the form of a WHERE expression. It may be
    //! given as an array to use bindings or @expr{sprintf@}-style
    //! formatting - see @[handle_argspec] for details.
    //!
    //! A WHERE clause will always be generated, but you can put e.g.
    //! "TRUE" in the match condition to select all records.
    //!
    //! @param fields
    //! The fields to retrieve. All fields are retrieved if it's zero or
    //! left out.
    //!
    //! For columns, the result mappings always have corresponding
    //! entries. Other fields, i.e. properties, only occur in the result
    //! mappings when they match fields in the @[prop_col] column.
    //!
    //! A @expr{0@} (zero) entry can be used in @[fields] to return all
    //! properties without filtering.
    //!
    //! @param select_exprs
    //! Optional extra select expression besides the requested columns.
    //! This is just added to the list of selected columns, after a
    //! separating ",".
    //!
    //! @[select_exprs] may be given as an array to use bindings or
    //! @expr{sprintf@}-style formatting - see @[handle_argspec] for
    //! details.
    //!
    //! Note that expressions in @[select_exprs] that produce TIMESTAMP
    //! values are not converted to unix time integers - they are instead
    //! returned as formatted date/time strings.
    //!
    //! @param table_refs
    //! Optional other tables to join into the SELECT. This is inserted
    //! between "FROM @[table]" and "WHERE".
    //!
    //! @param rest
    //! Optional clauses that follows after the WHERE clause, e.g. ORDER
    //! BY, GROUP BY, and LIMIT. It may be given as an array to use
    //! bindings or @expr{sprintf@}-style formatting - see
    //! @[handle_argspec] for details.
    //!
    //! @param select_flags
    //! Flags for the SELECT statement. If this string is given, it is
    //! simply inserted directly after the "SELECT" keyword.
    //!
    //! @returns
    //! Returns a @[SqlTable.Result] object from which the results can be
    //! retrieved. Zero is never returned.
    //!
    //! @note
    //! The result object implements an iterator, so it can be used
    //! directly in e.g. a @expr{foreach@}.
    //!
    //! @note
    //! @[quote] may be used to quote string literals if the
    //! @expr{sprintf@}-style formats aren't used.
    //!
    //! @seealso
    //! @[select1], @[get], @[get_multi]
    {
      return conn_select (get_db(), where, fields, select_exprs, table_refs,
    		      rest, select_flags);
    }
    
    array select1 (string|array select_expr, string|array where,
    	       void|string table_refs, void|string|array rest,
    	       void|string select_flags)
    //! Convenience variant of @[select] for retrieving only a single
    //! column. The return value is an array containing the values in the
    //! @[select_expr] column.
    //!
    //! @param select_expr
    //! The field to retrieve. It may name a column or a property, or it
    //! may be a select expression like "SHA1(x)". It may be given as an
    //! array to use bindings or @expr{sprintf@}-style formatting - see
    //! @[handle_argspec] for details.
    //!
    //! @param where
    //! The match condition, on the form of a WHERE expression. A WHERE
    //! clause will always be generated, but you can put e.g. "TRUE" in
    //! the match condition to select all records.
    //!
    //! @[where] may be given as an array to use bindings or
    //! @expr{sprintf@}-style formatting - see @[handle_argspec] for
    //! details.
    //!
    //! @param table_refs
    //! Optional other tables to join into the SELECT. This is inserted
    //! between "FROM @[table]" and "WHERE".
    //!
    //! @param rest
    //! Optional clauses that follows after the WHERE clause, e.g. ORDER
    //! BY, GROUP BY, and LIMIT. It may be given as an array to use
    //! bindings or @expr{sprintf@}-style formatting - see
    //! @[handle_argspec] for details.
    //!
    //! @param select_flags
    //! Flags for the SELECT statement. If this string is given, it is
    //! simply inserted directly after the "SELECT" keyword.
    //!
    //! @returns
    //! Returns an array with the values in the selected column. If a
    //! property is retrieved and some rows don't have the wanted property
    //! then @[UNDEFINED] is put into those elements.
    //!
    //! @seealso
    //! @[select], @[get], @[get_multi]
    {
      return conn_select1 (get_db(), select_expr, where, table_refs,
    		       rest, select_flags);
    }
    
    mapping(string:mixed) get (mixed id, void|array(string) fields)
    //! Returns the record matched by a primary key value, or zero if
    //! there is no such record.
    //!
    //! @param id
    //! The value for the primary key.
    //!
    //! If the table has a multicolumn primary key then @[id] must be an
    //! array which has the values for the primary key columns in the same
    //! order as @[pk_cols]. Otherwise @[id] is taken directly as the
    //! value of the single primary key column.
    //!
    //! @param fields
    //! The fields to retrieve. All fields are retrieved if it's zero or
    //! left out.
    //!
    //! For columns, the result mappings always have corresponding
    //! entries. Other fields, i.e. properties, only occur in the result
    //! mappings when they match fields in the @[prop_col] column.
    //!
    //! A @expr{0@} (zero) entry can be used in @[fields] to return all
    //! properties without filtering.
    //!
    //! @seealso
    //! @[select], @[select1], @[get_multi]
    {
      return conn_get (get_db(), id, fields);
    }
    
    Result get_multi (array(mixed) ids, void|array(string) fields)
    //! Retrieves multiple records selected by primary key values.
    //!
    //! This function currently only works if the primary key is a single
    //! column.
    //!
    //! @param id
    //! Array containing primary key values.
    //!
    //! The number of returned records might be less than the number of
    //! entries here if some of them don't match any record. Also, the
    //! order of the returned records has no correlation to the order in
    //! the @[id] array.
    //!
    //! @param fields
    //! The fields to retrieve. All fields are retrieved if it's zero or
    //! left out.
    //!
    //! For columns, the result mappings always have corresponding
    //! entries. Other fields, i.e. properties, only occur in the result
    //! mappings when they match fields in the @[prop_col] column.
    //!
    //! A @expr{0@} (zero) entry can be used in @[fields] to return all
    //! properties without filtering.
    //!
    //! @returns
    //! Returns a @[SqlTable.Result] object from which the results can be
    //! retrieved. Zero is never returned.
    //!
    //! @note
    //! The result object implements an iterator, so it can be used
    //! directly in e.g. a @expr{foreach@}.
    //!
    //! @seealso
    //! @[get], @[select], @[select1]
    {
      return conn_get_multi (get_db(), ids, fields);
    }
    
    // Explicit connection handling.
    
    int conn_insert (Sql.Sql db_conn, mapping(string:mixed)... records)
    //! Like @[insert], but a database connection object is passed
    //! explicitly instead of being retrieved via @[get_db].
    {
      Sql.mysql conn = db_conn->master_sql;
    #ifdef DEBUG
      if (!sizeof (records)) error ("Must give at least one record.\n");
    #endif
      UPDATE_MSG ("%O: insert %O\n",
    	      this, sizeof (records) == 1 ? records[0] : records);
      mapping(string:mixed) first_rec = records[0];
      conn->big_query ("INSERT `" + table + "` " +
    		   make_insert_clause (records),
    		   0, query_charset);
      if (!id_col) return 0;
      if (zero_type (first_rec[id_col]))
        return first_rec[id_col] = conn->insert_id();
      else
        return first_rec[id_col];
    }
    
    int conn_insert_ignore (Sql.Sql db_conn, mapping(string:mixed)... records)
    //! Like @[insert_ignore], but a database connection object is passed
    //! explicitly instead of being retrieved via @[get_db].
    {
      Sql.mysql conn = db_conn->master_sql;
    #ifdef DEBUG
      if (!sizeof (records)) error ("Must give at least one record.\n");
    #endif
      UPDATE_MSG ("%O: insert_ignore %O\n",
    	      this, sizeof (records) == 1 ? records[0] : records);
      mapping(string:mixed) first_rec = records[0];
      conn->big_query ("INSERT IGNORE `" + table + "` " +
    		   make_insert_clause (records),
    		   0, query_charset);
      if (!id_col) return 0;
      int last_insert_id = conn->insert_id();
      if (last_insert_id &&
          sizeof (records) == 1 && zero_type (first_rec[id_col]))
        // Only set the field if we got a single record. Otherwise we
        // don't really know which record it applies to.
        first_rec[id_col] = last_insert_id;
      return last_insert_id;
    }
    
    int conn_replace (Sql.Sql db_conn, mapping(string:mixed)... records)
    //! Like @[replace], but a database connection object is passed
    //! explicitly instead of being retrieved via @[get_db].
    {
      Sql.mysql conn = db_conn->master_sql;
    #ifdef DEBUG
      if (!sizeof (records)) error ("Must give at least one record.\n");
    #endif
      UPDATE_MSG ("%O: replace %O\n",
    	      this, sizeof (records) == 1 ? records[0] : records);
      mapping(string:mixed) first_rec = records[0];
      conn->big_query ("REPLACE `" + table + "` " +
    		   make_insert_clause (records),
    		   0, query_charset);
      if (!id_col) return 0;
      if (zero_type (first_rec[id_col]))
        return first_rec[id_col] = conn->insert_id();
      else
        return first_rec[id_col];
    }
    
    void conn_update (Sql.Sql db_conn, mapping(string:mixed) record,
    		  void|int(0..2) clear_other_fields)
    //! Like @[update], but a database connection object is passed
    //! explicitly instead of being retrieved via @[get_db].
    {
      Sql.mysql conn = db_conn->master_sql;
    #ifdef DEBUG
      if (!(<0,1,2>)[clear_other_fields])
        error ("Invalid clear_other_fields flag.\n");
    #endif
      UPDATE_MSG ("%O: update%s %O\n", this,
    	      clear_other_fields == 2 ? " (clear all other fields)" :
    	      clear_other_fields == 1 ? " (clear other props)" : "", record);
    
      record += ([]);
      string pk_where = make_pk_where (record);
      if (!pk_where)
        error ("The record lacks a value for a primary key column.\n");
      record = update_pack_fields (conn, record, pk_where, clear_other_fields);
    
      conn->big_query ("UPDATE `" + table + "` "
    		   "SET " + make_set_clause (record,
    					     clear_other_fields == 2) + " "
    		   "WHERE " + pk_where + " "
    		   "LIMIT 1",	// The limit is just extra paranoia.
    		   0, query_charset);
    }
    
    int conn_insert_or_update (Sql.Sql db_conn, mapping(string:mixed) record,
    			   void|int(0..2) clear_other_fields)
    //! Like @[insert_or_update], but a database connection object is
    //! passed explicitly instead of being retrieved via @[get_db].
    {
      Sql.mysql conn = db_conn->master_sql;
    #ifdef DEBUG
      if (!(<0,1,2>)[clear_other_fields])
        error ("Invalid clear_other_fields flag.\n");
    #endif
      UPDATE_MSG ("%O: insert_or_update%s %O\n", this,
    	      clear_other_fields == 2 ? " (clear all other fields)" :
    	      clear_other_fields == 1 ? " (clear other props)" : "", record);
    
      // If we have properties to merge then do that with separate queries
      // afterwards. We never do it by first querying the properties from
      // an existing record (if any), even though that could save one
      // query. That's because the race is less bothersome if we do it
      // afterwards based on id_col than before based on pk_cols (or
      // possibly all fields if pk_cols aren't given) - in the latter case
      // we could retrieve the properties from a different record that got
      // removed in the race window. Doing it afterwards might change the
      // wrong record only if id_col isn't used.
    
      mapping(string:mixed) real_cols = col_types & record;
      mapping(string:mixed) other_fields = record - real_cols;
      string prop_col_value;
    
      if (sizeof (other_fields)) {
    #ifdef DEBUG
        if (!prop_col) error ("Column(s) %s missing in table %O.\n",
    			  String.implode_nicely (indices (other_fields)),
    			  table);
    #endif
        // These encoded properties are normally for the INSERT clause
        // only - not used on update. That way we can avoid the two extra
        // queries in the INSERT case, but otoh that query gets larger.
        // It's used in both cases if clear_other_fields is set, though.
        prop_col_value = get_and_merge_props (0, 0, other_fields);
      }
      else if (clear_other_fields && prop_col)
        prop_col_value = "";
    
      array(string) update_set = allocate (sizeof (real_cols));
      {
        int i;
        foreach (real_cols; string col;)
          update_set[i++] = "`" + col + "`=VALUES(`" + col + "`)";
      }
    
      if (prop_col_value && zero_type (real_cols[prop_col])) {
        if (clear_other_fields)
          update_set += ({"`" + prop_col + "`=VALUES(`" + prop_col + "`)"});
        real_cols[prop_col] = prop_col_value;
      }
    
      if (id_col && zero_type (real_cols[id_col]))
        update_set += ({"`" + id_col + "`=LAST_INSERT_ID(`" + id_col + "`)"});
    
      if (clear_other_fields == 2) {
        foreach (col_types - real_cols - ({id_col}); string col;)
          update_set += ({"`" + col + "`=DEFAULT(`" + col + "`)"});
      }
    
      conn->big_query ("INSERT `" + table + "` " +
    		   make_insert_clause (({real_cols})) + " " +
    		   (sizeof (update_set) ?
    		    "ON DUPLICATE KEY UPDATE " + update_set * "," : ""),
    		   0, query_charset);
    
      if (id_col && zero_type (record[id_col]))
        record[id_col] = conn->insert_id();
    
      if (sizeof (other_fields) && !clear_other_fields &&
          // affected_rows() returns 2 if a record was updated, 1 if a new
          // one was added, and 0 if nothing happened. We should merge in
          // the property changes for both 2 and 0.
          conn->affected_rows() != 1) {
        string where = make_pk_where (record + ([]));
        if (!where) {
          // There's no AUTO_INCREMENT id column and the record doesn't
          // contain values for the primary keys, so some of the columns
          // matched an existing record through another unique index.
          // Since we don't know which they are we have to match all the
          // columns in the WHERE clause. FIXME: Add tracking of unique
          // indices to avoid this.
          String.Buffer buf = String.Buffer();
          int first = 1;
          foreach (real_cols; string col_name; mixed val)
    	if (col_name != prop_col) {
    	  if (first) first = 0; else buf->add (" AND ");
    	  buf->add ("`", col_name, "`=");
    	  add_mysql_value (buf, col_name, val);
    	}
        }
        update_props (conn, where, other_fields);
      }
    
      return id_col && record[id_col];
    }
    
    void conn_delete (Sql.Sql db_conn, string|array where, void|string|array rest)
    //! Like @[delete], but a database connection object is passed
    //! explicitly instead of being retrieved via @[get_db].
    {
      UPDATE_MSG ("%O: delete WHERE (%O)%s\n", this, where,
    	      rest ? sprintf (" %O", rest) : "");
    
      mapping(string|int:mixed) bindings = ([]);
      if (arrayp (where)) where = handle_argspec (where, bindings);
      if (arrayp (rest)) rest = handle_argspec (rest, bindings);
      if (!sizeof (bindings)) bindings = 0;
    
      db_conn->master_sql->big_query ("DELETE FROM `" + table + "` "
    				  "WHERE (" + where + ") " + (rest || ""),
    				  bindings);
    }
    
    void conn_remove (Sql.Sql db_conn, mixed id)
    //! Like @[remove], but a database connection object is passed
    //! explicitly instead of being retrieved via @[get_db].
    {
      UPDATE_MSG ("%O: remove %O\n", this, id);
      Sql.mysql conn = db_conn->master_sql;
      conn->big_query ("DELETE FROM `" + table + "` "
    		   "WHERE " + simple_make_pk_where (id),
    		   0, query_charset);
    }
    
    void conn_remove_multi (Sql.Sql db_conn, array(mixed) ids)
    //! Like @[remove_multi], but a database connection object is passed
    //! explicitly instead of being retrieved via @[get_db].
    {
      UPDATE_MSG ("%O: remove_multi %{%O,%}\n", this, ids);
      Sql.mysql conn = db_conn->master_sql;
      // FIXME: Split into several queries if the list is very long.
      conn->big_query ("DELETE FROM `" + table + "` "
    		   "WHERE " + make_multi_pk_where (ids),
    		   0, query_charset);
    }
    
    Result conn_select (Sql.Sql db_conn, string|array where,
    		    void|array(string) fields, void|string|array select_exprs,
    		    void|string table_refs, void|string|array rest,
    		    void|string select_flags)
    //! Like @[select], but a database connection object is passed
    //! explicitly instead of being retrieved via @[get_db].
    {
      mapping(string:mixed) bindings = ([]);
      if (arrayp (where)) where = handle_argspec (where, bindings);
      if (arrayp (select_exprs))
        select_exprs = handle_argspec (select_exprs, bindings);
      if (arrayp (rest)) rest = handle_argspec (rest, bindings);
      if (!sizeof (bindings)) bindings = 0;
    
      Result res = Result();
    
      string query = "SELECT ";
      if (select_flags) query += select_flags + " ";
      query += res->prepare_select_expr (fields, select_exprs, !!table_refs) +
        " FROM `" + table + "` " + (table_refs || "");
    
      res->res = db_conn->master_sql->big_typed_query (
        query + " WHERE (" + where + ") " + (rest || ""), bindings);
    
      return res;
    }
    
    array conn_select1 (Sql.Sql db_conn, string|array select_expr,
    		    string|array where, void|string table_refs,
    		    void|string|array rest, void|string select_flags)
    //! Like @[select1], but a database connection object is passed
    //! explicitly instead of being retrieved via @[get_db].
    {
      mapping(string:mixed) bindings = ([]);
      if (arrayp (select_expr))
        select_expr = handle_argspec (select_expr, bindings);
      if (arrayp (where)) where = handle_argspec (where, bindings);
      if (arrayp (rest)) rest = handle_argspec (rest, bindings);
      if (!sizeof (bindings)) bindings = 0;
    
      string property;
      string col_type = col_types[select_expr];
      if (!col_type && prop_col &&
          sscanf (select_expr, "%*[^ .(]%*1[ .(]") != 2) {
        property = select_expr;
        select_expr = prop_col;
        col_type = "string";
      }
    
      string query = "SELECT ";
      if (select_flags) query += select_flags + " ";
      if (timestamp_cols[select_expr])
        query += "UNIX_TIMESTAMP(`" + table + "`.`" + select_expr + "`)";
      else if (col_type)
        query += "`" + table + "`.`" + select_expr + "`";
      else
        query += select_expr;
      query += " FROM `" + table + "` " + (table_refs || "");
    
      Sql.mysql_result res = db_conn->master_sql->big_typed_query (
        query + " WHERE (" + where + ") " + (rest || ""), bindings);
    
    #ifdef DEBUG
      if (res->num_fields() != 1)
        error ("Result from %O did not contain a single field (got %d fields).\n",
    	   query, res->num_fields());
    #endif
    
      array ret = allocate (res->num_rows());
    
      if (property) {
        int i = 0;
        while (array(string) ent = res->fetch_row())
          ret[i++] = decode_props (ent[0], where)[property];
      }
    
      else {
        int i = 0;
        while (array(mixed) ent = res->fetch_row()) {
          ret[i++] = ent[0];
        }
      }
    
      return ret;
    }
    
    mapping(string:mixed) conn_get (Sql.Sql db_conn, mixed id,
    				void|array(string) fields)
    //! Like @[get], but a database connection object is passed explicitly
    //! instead of being retrieved via @[get_db].
    {
      Sql.mysql conn = db_conn->master_sql;
    
    #ifdef DEBUG
      if (fields && !sizeof (fields)) error ("No fields selected.\n");
    #endif
    
      int want_all = !fields;
      if (want_all) {
        if (prop_col)
          // Some dwim: Probably don't want the prop_col value verbatim as
          // well in the result.
          fields = indices (col_types - ([prop_col: ""]));
        else
          fields = indices (col_types);
      }
    
      mapping(string:string) real_cols = col_types & fields;
      mapping(string:string) other_fields;
    
      if (sizeof (real_cols) < sizeof (fields)) {
        if (!has_value (fields, 0)) {
          mapping(string:string) field_map = mkmapping (fields, fields);
          other_fields = field_map - real_cols;
        }
    #ifdef DEBUG
        if (!prop_col)
          error ("Requested nonexisting column(s) %s.\n",
    	     String.implode_nicely (indices (other_fields ||
    					     ([prop_col: ""]))));
    #endif
      }
      else if (!want_all)
        other_fields = ([]);
    
      int exclude_prop_col;
      if (prop_col && !equal (other_fields, ([]))) {
        if (!real_cols[prop_col]) exclude_prop_col = 1;
        real_cols[prop_col] = "string";
      }
    
      array(string) real_col_names = indices (real_cols);
      array(string) real_col_types = values (real_cols);
    
      array(string) select_cols = allocate (sizeof (real_col_types));
      foreach (real_col_names; int i; string name) {
        if (timestamp_cols[name])
          select_cols[i] = "UNIX_TIMESTAMP(`" + name + "`)";
        else
          select_cols[i] = "`" + name + "`";
      }
    
      string pk_where = simple_make_pk_where (id);
      Mysql.mysql_result res =
        conn->big_typed_query ("SELECT " + (select_cols * ",") + " "
    			   "FROM `" + table + "` "
    			   "WHERE " + pk_where,
    			   0, query_charset);
    
      array(string) ent = res->fetch_row();
      if (!ent) return 0;
    
      mapping(string:mixed) rec = ([]);
    
      foreach (ent; int i; string|int|float|Sql.Null val) {
        rec[real_col_names[i]] = val;
      }
    
      if (prop_col && !equal (other_fields, ([]))) {
        string prop_val = rec[prop_col];
        if (prop_val != 0 && prop_val != "") {
          mapping(string:mixed) props = decode_props (prop_val, pk_where);
          if (other_fields) props = other_fields & props;
          // Properties take precedence over columns. That allows gradual
          // migration to columns, since the field in prop_col will
          // disappear in the next update.
          rec += props;
        }
        if (exclude_prop_col) m_delete (rec, prop_col);
      }
    
      return rec;
    }
    
    Result conn_get_multi (Sql.Sql db_conn, array(mixed) ids,
    		       void|array(string) fields)
    //! Like @[get_multi], but a database connection object is passed
    //! explicitly instead of being retrieved via @[get_db].
    {
      Sql.mysql conn = db_conn->master_sql;
      Result res = Result();
      // FIXME: Split into several queries if the list is very long.
      res->res =
        conn->big_typed_query ("SELECT " +
    			   res->prepare_select_expr (fields, 0, 0) +
    			   " FROM `" + table + "` "
    			   "WHERE " + make_multi_pk_where (ids),
    			   0, query_charset);
      return res;
    }
    
    // The Result object.
    
    class Result
    //! Result object returned by e.g. @[select]. This is similar in
    //! function to an @[Sql.sql_result] object. It also implements the
    //! iterator interface and can therefore be used directly in e.g.
    //! @expr{foreach@}.
    {
      Sql.mysql_result res;
      //! The underlying result object from the db connection.
    
      protected array(string) real_col_names;
      // The names of the sql columns to retrieve. Initially this contains
      // only the column names built from fields, not the names of the
      // user supplied select expressions. The latter are added when the
      // first row is fetched.
      //
      // If properties are being fetched but not the property column
      // itself then real_col_names has a zero in the prop_pos position.
    
      protected int add_select_expr_names;
      // Nonzero if there are user supplied select expressions and their
      // names have not yet been added to real_col_names.
    
      protected int prop_pos;
      // Position of the prop_col values in the result arrays, or -1 if we
      // shouldn't handle that.
    
      protected mapping(string:string) other_fields;
      // Selected fields in the prop_col values, or zero if everything
      // is wanted. The values are insignificant.
    
      protected int cur_row;
      protected mapping(string:mixed) cur_rec;
      // Need to store the last fetched result for the iterator interface
      // since fetch() advances the cursor, while the iterator's index()
      // and value() must not. These are updated by fetch(), and cur_row
      // is the row index of cur_rec, _not_ the next one.
    
      protected string _sprintf (int flag)
      {
        return flag == 'O' &&
          sprintf ("SqlTable.Result(%O, %d/%d)",
    	       global::this && table, cur_row, res && res->num_rows());
      }
    
      int num_rows() {return res->num_rows();}
      //! Returns the number of rows in the result.
    
      int eof() {return res->eof();}
      //! Returns nonzero if there are no more rows.
    
      array(mapping(string:mixed)) column_info() {return res->fetch_fields();}
      //! Returns information about the columns in the result.
    
      mapping(string:mixed) fetch()
      //! Fetches the next record from the result and advance the cursor.
      //! Returns zero if there are no more records.
      //!
      //! The record is returned as a mapping. It is similar to the
      //! mappings returned by @[Sql.query], except that native pike types
      //! and @[Val.null] are used. If @[prop_col] is used then properties
      //! from that column can be returned as mapping entries alongside
      //! the columns, and those values can be any pike data type.
      //!
      //! As opposed to the @[Sql.query] mappings, the returned mapping
      //! has a single entry for each field - there are no duplicate
      //! entries prefixed with the table name.
      {
        if (cur_rec) cur_row++;
    
        array(string) ent = res->fetch_row();
        if (!ent) return cur_rec = 0;
    
        mapping(string:mixed) rec;
    
        if (add_select_expr_names) {
          // Fetch the names of the user supplied select expressions from
          // the result.
          array(mapping(string:mixed)) field_info =
    	res->fetch_fields()[sizeof (real_col_names)..];
          real_col_names += column (field_info, "name");
          add_select_expr_names = 0;
        }
    
        rec = mkmapping (real_col_names, ent);
    
        if (prop_pos != -1) {
          m_delete (rec, 0);
          if (!(<0, "">)[ent[prop_pos]]) {
    	mapping(string:mixed) props = decode_props (ent[prop_pos], 0);
    	if (other_fields) props = other_fields & props;
    	// Properties take precedence over columns. That allows
    	// gradual migration to columns, since the field in prop_col
    	// will disappear in the next update.
    	rec += props;
          }
        }
    
        return cur_rec = rec;
      }
    
      array(mapping(string:mixed)) get_array()
      //! Returns all the remaining records as an array of mappings.
      //! @[eof] returns true after this.
      //!
      //! @note
      //! This is not a cast since it destructively modifies this object
      //! by fetching all remaining records.
      {
        array(mapping(string:mixed)) res = ({});
        while (mapping(string:mixed) rec = fetch())
          res += ({rec});
        return res;
      }
    
      string prepare_select_expr (array(string) fields, string select_exprs,
    			      int with_table_qualifiers)
      // Internal function to initialize all the variables from an
      // optional array of requested fields.
      {
    #ifdef DEBUG
        if (fields && !sizeof (fields)) error ("No fields selected.\n");
    #endif
    
        string select_clause;
        string tbl_qual = with_table_qualifiers ? table + "`.`" : "";
        int want_all = !fields;
    
        prop_pos = -1;
    
        if (want_all) {
          if (prop_col)
    	// Some dwim: Probably don't want the prop_col value verbatim
    	// as well in the result.
    	fields = indices (col_types - ([prop_col: ""]));
          else
    	fields = indices (col_types);
        }
    
        mapping(string:string) real_cols = col_types & fields;
        if (sizeof (real_cols)) {
          mapping(string:int(1..1)) ts_cols = timestamp_cols & fields;
          if (sizeof (ts_cols)) {
    	real_col_names = indices (ts_cols);
    	select_clause =
    	  "UNIX_TIMESTAMP(`" + tbl_qual +
    	  (real_col_names * ("`),UNIX_TIMESTAMP(`" + tbl_qual)) + "`)";
    	if (sizeof (ts_cols) < sizeof (real_cols)) {
    	  array(string) normal_col_names = indices (real_cols - ts_cols);
    	  real_col_names += normal_col_names;
    	  select_clause +=
    	    ",`" + tbl_qual + (normal_col_names * ("`,`" + tbl_qual)) + "`";
    	}
          }
          else {
    	real_col_names = indices (real_cols);
    	select_clause =
    	  "`" + tbl_qual + (real_col_names * ("`,`" + tbl_qual)) + "`";
          }
        }
    
        if (sizeof (real_cols) < sizeof (fields)) {
          if (!has_value (fields, 0)) {
    	mapping(string:string) field_map = mkmapping (fields, fields);
    	other_fields = field_map - real_cols;
          }
    #ifdef DEBUG
          if (!prop_col)
    	error ("Requested nonexisting column(s) %s.\n",
    	       String.implode_nicely (indices (other_fields ||
    					       ([prop_col: ""]))));
    #endif
        }
        else if (!want_all)
          other_fields = ([]);
    
        if (prop_col && !equal (other_fields, ([]))) {
          if (real_cols[prop_col])
    	prop_pos = search (real_col_names, prop_col);
          else {
    	prop_pos = sizeof (real_col_names);
    	if (select_clause) select_clause += ",`" + tbl_qual + prop_col + "`";
    	else select_clause = "`" + tbl_qual + prop_col + "`";
    	real_col_names += ({0});
          }
        }
    
        if (select_exprs) {
          add_select_expr_names = 1;
          if (select_clause) select_clause += ", " + select_exprs;
          else select_clause = select_exprs;
        }
    
        return select_clause || "";
      }
    
      // Iterator interface. This is a separate object only to avoid
      // implementing a `! in Result, which would make it behave oddly.
    
    #ifdef DEBUG
      protected int got_iterator;
    #endif
    
      Iterator _get_iterator()
      //! Returns an iterator for the result. Only one iterator may be
      //! created per @[Result] object.
      {
    #ifdef DEBUG
        if (got_iterator)
          error ("Cannot create more than one iterator for a Result object.\n");
        got_iterator = 1;
    #endif
        return Iterator (num_rows());
      }
    
      protected class Iterator (protected int cached_num_rows)
      {
        protected int `!() {return cur_row >= cached_num_rows;}
        protected int _sizeof() {return cached_num_rows;}
    
        protected string _sprintf (int flag)
        {
          return flag == 'O' &&
    	sprintf ("SqlTable.Result.Iterator(%O, %d/%d)",
    		 Result::this && global::this && table,
    		 Result::this && cur_row, cached_num_rows);
        }
    
        int index()
        {
          return cur_row < cached_num_rows ? cur_row : UNDEFINED;
        }
    
        mapping(string:mixed) value()
        {
          if (!cur_rec) fetch();
          return cur_row < cached_num_rows ? cur_rec : UNDEFINED;
        }
    
        int next()
        {
          fetch();
          return cur_row < cached_num_rows;
        }
    
        protected Iterator `+= (int steps)
        // Old interface for pike 7.4 compat.
        {
          if (steps < 0) error ("Stepping backwards not supported.\n");
          while (steps--) if (!next()) break;
          return this;
        }
      }
    }
    
    // Internals.
    
    protected string format_rec_compact (mapping(string:mixed) rec)
    {
      array(string) vars = indices (rec);
      array(mixed) vals = values (rec);
      sort (vars, vals);
      array(string) rows = allocate (sizeof (vars));
      foreach (vals; int i; mixed val) {
        if (stringp (val) || arrayp (val) || mappingp (val))
          rows[i] = sprintf ("  %O: %t[%d],\n", vars[i], val, sizeof (val));
        else
          rows[i] = sprintf ("  %O: %O,\n", vars[i], val);
      }
      return "([\n" + rows * "" + "])";
    }
    
    protected mapping(string:mixed) decode_props (string prop_val, string where)
    {
      if (prop_val != 0 && prop_val != "") {
        mixed decoded;
        mixed err = catch (decoded = decode_value (prop_val));
        if (err || !mappingp (decoded))
          // This is bad, but the best we can do is to report it on stderr
          // and continue so that the bogus value gets flushed out and the
          // app can recover somewhat. FIXME: Maybe add a flag to control
          // what to do here.
          werror ("WARNING! Failed to decode property value "
    	      "in `%s` from `%s`%s: %s"
    	      "The garbled property value is: %O\n"
    	      "It will be ignored and overwritten.\n",
    	      prop_col, table, where ? " where " + where : "",
    	      (err ? describe_error (err) :
    	       sprintf ("Expected mapping, got %O.\n", decoded)),
    	      prop_val);
        else
          return [mapping(string:mixed)] decoded;
      }
      return ([]);
    }
    
    protected void add_mysql_value (String.Buffer buf, string col_name, mixed val)
    // A value with zero_type is formatted as "DEFAULT".
    {
      if (stringp (val)) {
    #ifdef DEBUG
        if (col_types[col_name] != "string")
          error ("Got string value %q for %s column `%s`.\n",
    	     val, col_types[col_name], col_name);
    #endif
        if (String.width (val) == 8)
          // _latin1 works fine for binary data since the actual charset
          // isn't significant. The only problem is for 8-bit text where
          // mysql latin1 has chars in the control range 0x80..0x9f. Since
          // those control chars are very uncommon in text we just ignore
          // that problem for now.
          buf->add ("_latin1\"", quote (val), "\"");
        else
          // FIXME: If the column holds binary data we should throw an
          // error here instead of sending what is effectively garbled
          // data.
          buf->add ("_utf8\"", string_to_utf8 (quote (val)), "\"");
      }
      else if (intp (val)) {
        if (zero_type (val))
          buf->add ("DEFAULT");
        else {
    #ifdef DEBUG
          if (col_types[col_name] != "int" && !datetime_cols[col_name])
    	error ("Got integer value %O for %s column `%s`.\n",
    	       val, col_types[col_name] || "string", col_name);
    #endif
          if (timestamp_cols[col_name])
    	buf->add ("FROM_UNIXTIME(", (string) val, ")");
          else
    	buf->add ((string) val);
        }
      }
      else if (val == Val.null)
        buf->add ("NULL");
      else {
    #ifdef DEBUG
        if (objectp (val) && functionp (val->den)) {
          // Allow Gmp.mpq for float fields, and for int fields if they
          // have no fractional part.
          if (col_types[col_name] != "float" &&
    	  !(val->den() == 1 && col_types[col_name] == "int"))
    	error ("Got %O for %s column `%s`.\n",
    	       val, col_types[col_name] || "string", col_name);
        }
        else {
          if (!floatp (val))
    	error ("Cannot use %t value for `%s`: %O\n", val, col_name, val);
          if (col_types[col_name] != "float")
    	error ("Got float value %O for %s column `%s`.\n",
    	       val, col_types[col_name] || "string", col_name);
        }
    #endif
        buf->add ((string) val);
      }
    }
    
    protected string make_insert_clause (array(mapping(string:mixed)) records)
    // Returns an "(a,b,c) VALUES (1,2,3),(4,5,6)" clause as used in
    // INSERT statements. Fields that don't have columns are packed into
    // prop_col updates. Destructive on the records array, but not on the
    // mapping elements.
    {
      mapping(string:mixed) query_cols = ([]); // Only indices are relevant.
    
      // FIXME: Ought to use bindings, but Mysql.mysql doesn't support it
      // yet (as of pike 7.8.191).
    
    #ifdef DEBUG
      if (!sizeof (records)) error ("Must give at least one record.\n");
    #endif
    
      foreach (records; int i; mapping(string:mixed) rec) {
        mapping(string:mixed) real_cols = col_types & rec;
        mapping(string:mixed) other_fields = rec - real_cols;
    
        if (sizeof (other_fields)) {
    #ifdef DEBUG
          if (!prop_col) error ("Column(s) %s missing.\n",
    			    String.implode_nicely (indices (other_fields)));
    #endif
    
          string encoded_props;
          if (mixed err = catch {
    	  encoded_props = encode_value_canonic (other_fields);
    	})
    	error ("Failed to encode properties: %sThe properties are: %O\n",
    	       describe_error (err), other_fields);
    
          if (sizeof (encoded_props) > prop_col_max_length)
    	error ("Property value too long (%d b - max is %d b) "
    	       "for `%s` in `%s` - failed to insert record: %s\n",
    	       sizeof (encoded_props), prop_col_max_length, prop_col, table,
    	       format_rec_compact (rec));
          real_cols[prop_col] = encoded_props;
        }
    
        query_cols |= real_cols;
        records[i] = real_cols;
      }
    
      array(string) col_list = indices (query_cols);
      String.Buffer buf = String.Buffer();
      if (sizeof (col_list))
        buf->add ("(`", (col_list * "`,`"), "`) VALUES ");
      else
        buf->add ("() VALUES ");
    
      int first = 1;
      foreach (records, mapping(string:mixed) rec) {
        if (first) first = 0; else buf->add (",");
        buf->add ("(");
    
        int col_first = 1;
        foreach (map (col_list, rec); int i; mixed val) {
          if (col_first) col_first = 0; else buf->add (",");
          add_mysql_value (buf, col_list[i], val);
        }
    
        buf->add (")");
      }
    
      return buf->get();
    }
    
    protected string make_pk_where (mapping(string:mixed) rec)
    // Returns a WHERE expression like "a=1 AND b=2" for matching the
    // primary key, or zero if the record doesn't have values for all pk
    // columns. The pk fields are also removed from the rec mapping.
    {
    #ifdef DEBUG
      if (!sizeof (pk_cols)) error ("There is no primary key in this table.\n");
    #endif
      String.Buffer buf = String.Buffer();
      int first = 1;
      foreach (pk_cols, string pk_col) {
        if (first) first = 0; else buf->add (" AND ");
        mixed val = m_delete (rec, pk_col);
        if (zero_type (val) || val == Val.null)
          return 0;
        buf->add ("`", pk_col, "`=");
        add_mysql_value (buf, pk_col, val);
      }
      return buf->get();
    }
    
    protected string simple_make_pk_where (mixed id)
    // Returns a WHERE expression like "a=1 AND b=2" for matching the
    // primary key. id is like the argument to get().
    {
    #ifdef DEBUG
      if (!sizeof (pk_cols)) error ("There is no primary key in this table.\n");
    #endif
    
      String.Buffer buf = String.Buffer();
      if (sizeof (pk_cols) == 1) {
        buf->add ("`", pk_cols[0], "`=");
    #ifdef DEBUG
        if (id == Val.null)
          error ("Cannot use Val.null for primary key column %O.\n",
    	     pk_cols[0]);
    #endif
        add_mysql_value (buf, pk_cols[0],
    		     id || 0); // Clear any zero_type in id.
      }
    
      else {
    #ifdef DEBUG
        if (!arrayp (id) || sizeof (id) != sizeof (pk_cols))
          error ("The id must be an array with %d elements.\n", sizeof (pk_cols));
    #endif
        int first = 1;
        foreach (pk_cols; int i; string pk_col) {
          if (first) first = 0; else buf->add (" AND ");
          buf->add ("`", pk_cols[0], "`=");
    #ifdef DEBUG
          if (id[i] == Val.null)
    	error ("Cannot use Val.null for primary key column %O.\n", pk_col);
    #endif
          add_mysql_value (buf, pk_col,
    		       id[i] || 0); // Clear any zero_type in id[i].
        }
      }
    
      return buf->get();
    }
    
    protected string make_multi_pk_where (array(mixed) ids, int|void negated)
    // Returns a WHERE expression like "foo IN (2,3,17,4711)" for matching
    // a bunch of records by primary key.
    {
    #ifdef DEBUG
      if (sizeof (pk_cols) != 1)
        error ("The table must have a single column primary key.\n");
    #endif
    
      if (!sizeof (ids)) return negated?"TRUE":"FALSE";
    
      string pk_col = pk_cols[0];
      string pk_type = col_types[pk_col];
    
      string optional_not = negated?" NOT ":"";
    
      if ((<"float", "int">)[pk_type]) {
    #ifdef DEBUG
        foreach (ids; int i; mixed id)
          if (!intp (id) && !floatp (id))
    	error ("Expected numeric value for primary key column %O, "
    	       "got %t at position %d.\n",
    	       pk_col, id, i);
    #endif
        return (timestamp_cols[pk_col] ?
    	    "UNIX_TIMESTAMP(`" + pk_col + "`)" : "`" + pk_col + "`") +
          optional_not + " IN (" + (((array(string)) ids) * ",") + ")";
      }
    
      else {
    #ifdef DEBUG
        foreach (ids; int i; mixed id)
          if (!stringp (id))
    	error ("Expected string value for primary key column %O, "
    	       "got %t at position %d.\n",
    	       pk_col, id, i);
    #endif
        String.Buffer buf = String.Buffer();
        buf->add ("`", pk_col, "` ", optional_not, " IN (");
        int first = 1;
        foreach (ids, string id) {
          if (first) first = 0; else buf->add (",");
          add_mysql_value (buf, pk_col, id);
        }
        buf->add (")");
        return buf->get();
      }
    }
    
    protected string get_and_merge_props (Sql.mysql conn, string pk_where,
    				      mapping(string:mixed) prop_changes)
    // Retrieves the current properties for a record (if pk_where is set),
    // merges prop_changes into them, and returns the new value to assign
    // to the properties column.
    {
      mapping(string:mixed) old_props;
      if (array(string) ent = pk_where &&
          conn->big_query ("SELECT `" + prop_col + "` "
    		       "FROM `" + table + "` "
    		       "WHERE " + pk_where,
    		       0, query_charset)->fetch_row())
        old_props = decode_props (ent[0], pk_where);
      else
        old_props = ([]);
    
      mapping(string:mixed) rem_props = filter (prop_changes, `==, Val.null);
      mapping(string:mixed) new_props = old_props + prop_changes - rem_props;
    
      string encoded_props;
      if (mixed err = catch {
          encoded_props = encode_value_canonic (new_props);
        })
        error ("Failed to encode properties: %sThe properties are: %O\n",
    	   describe_error (err), new_props);
    
      if (sizeof (encoded_props) > prop_col_max_length)
        error ("Property value too long (%d b - max is %d b) "
    	   "for `%s` in `%s`%s: %s",
    	   sizeof (encoded_props), prop_col_max_length,
    	   prop_col, table, pk_where ? " where " + pk_where : "",
    	   format_rec_compact (new_props));
    
      return encoded_props;
    }
    
    protected void update_props (Sql.mysql conn, string pk_where,
    			     mapping(string:mixed) prop_changes)
    // Updates the properties column by retrieving the current properties
    // and merging prop_changes into them. prop_changes is assumed to only
    // contain properties.
    {
      string encoded_props = get_and_merge_props (conn, pk_where, prop_changes);
      conn->big_query ("UPDATE `" + table + "` "
    		   "SET `" + prop_col + "`="
    		   "_binary\"" + quote (encoded_props) + "\" "
    		   "WHERE " + pk_where + " "
    		   "LIMIT 1",	// In case the WHERE condition is bad.
    		   0, query_charset);
    }
    
    protected mapping(string:mixed) update_pack_fields (
      Sql.mysql conn, mapping(string:mixed) rec, string pk_where,
      int replace_properties)
    // Returns a record mapping where all fields that don't have columns
    // have been packed into a prop_col entry. The table is queried if
    // necessary to obtain the old prop_col fields for merging.
    {
      mapping(string:mixed) real_cols = col_types & rec;
      mapping(string:mixed) other_fields = rec - real_cols;
    
      if (sizeof (other_fields)) {
    #ifdef DEBUG
        if (!prop_col) error ("Column(s) %s missing in table %O.\n",
    			  String.implode_nicely (indices (other_fields)),
    			  table);
    #endif
        if (replace_properties) pk_where = 0;
        else if (!pk_where) pk_where = make_pk_where (rec + ([]));
        real_cols[prop_col] = get_and_merge_props (conn, pk_where, other_fields);
      }
    
      else if (replace_properties && prop_col)
        real_cols[prop_col] = "";
    
      return real_cols;
    }
    
    protected string make_set_clause (mapping(string:mixed) rec,
    				  int set_all_columns)
    // Returns a SET clause like "a=1,b=2" as is used in an UPDATE
    // statement. All fields are assumed to have columns. If
    // set_all_columns is given then all unmentioned columns, except the
    // primary key columns, are set to their default values.
    {
      String.Buffer buf = String.Buffer();
    
      int first = 1;
      foreach (rec; string col; mixed val) {
        if (first) first = 0; else buf->add (",");
        buf->add ("`", col, "`=");
        add_mysql_value (buf, col, val);
      }
    
      if (set_all_columns)
        foreach (col_types - rec - pk_cols; string col;) {
          if (first) first = 0; else buf->add (",");
          buf->add ("`", col, "`=DEFAULT(`", col, "`)");
        }
    
      return buf->get();
    }
    #endif