---
server/base_server/roxenloader.pike | 235 +++++++++++++++++++++--------------
server/etc/modules/DBManager.pmod | 48 ++++---
server/modules/database/sqltag.pike | 29 +++-
server/modules/tags/rxmltags.pike | 4 +
server/protocols/http.pike | 4 +
5 files changed, 199 insertions(+), 121 deletions(-)
diff --git a/server/base_server/roxenloader.pike
b/server/base_server/roxenloader.pike
index c94158f..6c54153 100644
--- a/server/base_server/roxenloader.pike
+++ b/server/base_server/roxenloader.pike
@@ -1366,15 +1366,45 @@ string query_configuration_dir()
}
protected mapping(string:array(SQLTimeout)) sql_free_list = ([ ]);
-protected Thread.Local sql_reuse_in_thread = Thread.Local();
mapping(string:int) sql_active_list = ([ ]);
#ifdef DB_DEBUG
protected int sql_keynum;
mapping(int:string) my_mysql_last_user = ([]);
multiset(Sql.Sql) all_wrapped_sql_objects = set_weak_flag( (<>), 1 );
-#endif /* DB_DEBUG */
+string strip_db(string dbname)
+{ return replace(dbname,([
+ // "something longish":"a shortcut"
+ ]));
+}
+
+mapping host_info_sql_free_list()
+{ mapping m = ([]);
+ foreach(sql_free_list;string name;array sqklist)
+ switch(name)
+ { default:
+ foreach(sqklist;;object sqk)
+ m[strip_db(name)] += ();
+ case "local:rw":case "mysql:rw":case "roxen:rw":;
+ }
+ return m;
+}
+
+mapping sessiondb_list(mapping sqklist)
+{ mapping m = ([]);
+ if(sqklist)
+ foreach(sqklist;string name;object sqk)
+ switch(name)
+ { default:
+ foreach(sqklist;;object sqk)
+ m[strip_db(name)] += ({sqk && sqk->host_info()});
+ case "local:rw":case "mysql:rw":case "roxen:rw":;
+ }
+ return m;
+}
+
+#endif /* DB_DEBUG */
//! @appears clear_connect_to_my_mysql_cache
void clear_connect_to_my_mysql_cache( )
@@ -1388,10 +1418,36 @@ mapping(string:int) get_sql_free_list_status()
return map(sql_free_list, sizeof);
}
+protected int lastdbinstance;
+
+void weed_connect_to_my_mysql_cache( string name )
+{
+ if(++lastdbinstance>=sizeof(sql_free_list))
+ lastdbinstance=0;
+ string dbinstance=indices(sql_free_list)[lastdbinstance];
+ if(name!=dbinstance) {
+ array flist = sql_free_list[ dbinstance ];
+ int n=sizeof( flist );
+ destruct(flist[n-1]); // force close the descriptor
+ if( n > 1)
+ sql_free_list[ dbinstance ] = flist[..n-2];
+ else
+ m_delete( sql_free_list, dbinstance );
+ }
+ array flist = sql_free_list[ name ];
+ int n=sizeof( flist );
+ if( n > 1 ) {
+ for(int i=n/2;i<n;i++)
+ destruct(flist[i]); // force close the descriptor
+ sql_free_list[ name ] = flist[..n/2-1];
+ }
+}
+
+#define SQLTIMEOUT (5*60)
+
protected class SQLTimeout(protected Sql.Sql real)
{
- // 5 minutes timeout.
- protected int timeout = time(1) + 5*60;
+ protected int timeout = time(1) + SQLTIMEOUT;
protected int(0..1) `!()
{
@@ -1402,13 +1458,51 @@ protected class SQLTimeout(protected Sql.Sql real)
}
Sql.Sql get()
{
- if (timeout < time(1)) {
- real = 0;
- }
- Sql.Sql res = real;
- real = 0;
- return res;
+ timeout = time(1) + SQLTIMEOUT;
+ return real;
+ }
+}
+
+void sq_session_free(mapping sessiondbs)
+{
+ foreach(sessiondbs; string db_name; object real)
+ sq_cache_free(db_name, real);
+}
+
+protected void sq_cache_free(string db_name, Sql.Sql real)
+{
+ if(!real)
+ return;
+#ifdef DB_DEBUG
+ all_wrapped_sql_objects[real]=0;
+#endif
+
+#ifndef NO_DB_REUSE
+ mixed key;
+ catch {
+ key = sq_cache_lock();
+ };
+
+#ifdef DB_DEBUG
+ if(name!="local:rw" && name!="roxen:rw" && name!="mysql:rw")
+ werror("%O:%d added to free list\n", db_name, num );
+ m_delete(my_mysql_last_user, num);
+#endif
+ if( !--sql_active_list[db_name] )
+ m_delete( sql_active_list, db_name );
+ sql_free_list[ db_name ] = ({ SQLTimeout(real) }) +
+ (sql_free_list[ db_name ]||({}));
+ if( sizeof(sql_free_list[db_name]) > 16 )
+ {
+#ifdef DB_DEBUG
+ werror("Free list too large. Cleaning.\n" );
+#endif
+ weed_connect_to_my_mysql_cache(db_name);
}
+#else
+ // Slow'R'us
+ call_out(gc,0);
+#endif
}
protected class SQLResKey
@@ -1469,6 +1563,7 @@ protected class SQLResKey
case "fetch_fields": return fetch_fields;
case "seek": return seek;
case "fetch_row": return fetch_row;
+ case "release": return release;
}
return real[what];
}
@@ -1478,15 +1573,14 @@ protected class SQLResKey
return sprintf( "SQLRes( X, %O )", key );
}
+ protected void release()
+ {
+ key->release();
+ }
+
protected void destroy()
{
- if (key->reuse_in_thread) {
- // FIXME: This won't work well; destroy() might get called from
- // any thread when an object is refcount garbed.
- mapping(string:Sql.Sql) dbs_for_thread = sql_reuse_in_thread->get();
- if (!dbs_for_thread[key->db_name])
- dbs_for_thread[key->db_name] = key->real;
- }
+ release();
#if 0
werror("Destroying %O\n", this_object());
#endif
@@ -1497,7 +1591,7 @@ protected class SQLKey
{
protected Sql.Sql real;
protected string db_name;
- protected int reuse_in_thread;
+ protected mapping sessiondbs;
protected int `!( ) { return !real; }
@@ -1509,11 +1603,6 @@ protected class SQLKey
Sql.sql_result big_query( string f, mixed ... args )
{
if (Sql.sql_result o = real->big_query( f, @args )) {
- if (reuse_in_thread) {
- mapping(string:Sql.Sql) dbs_for_thread = sql_reuse_in_thread->get();
- if (dbs_for_thread[db_name] == real)
- m_delete (dbs_for_thread, db_name);
- }
return [object(Sql.sql_result)] (object) SQLResKey (o, this);
}
return 0;
@@ -1523,18 +1612,11 @@ protected class SQLKey
protected int num = sql_keynum++;
protected string bt;
#endif
- protected void create( Sql.Sql real, string db_name, int reuse_in_thread)
+ protected void create( Sql.Sql real, string db_name, void|mapping sessiondbs)
{
this_program::real = real;
this_program::db_name = db_name;
- this_program::reuse_in_thread = reuse_in_thread;
-
- if (reuse_in_thread) {
- mapping(string:Sql.Sql) dbs_for_thread = sql_reuse_in_thread->get();
- if (!dbs_for_thread) sql_reuse_in_thread->set (dbs_for_thread = ([]));
- if (!dbs_for_thread[db_name])
- dbs_for_thread[db_name] = real;
- }
+ this_program::sessiondbs = sessiondbs;
#ifdef DB_DEBUG
if( !real )
@@ -1557,48 +1639,20 @@ protected class SQLKey
#endif /* DB_DEBUG */
}
- protected void destroy()
+ protected void release()
{
- // FIXME: Ought to be abstracted to an sq_cache_free().
-#ifdef DB_DEBUG
- all_wrapped_sql_objects[real]=0;
-#endif
-
- if (reuse_in_thread) {
- // FIXME: This won't work well; destroy() might get called from
- // any thread when an object is refcount garbed.
- mapping(string:Sql.Sql) dbs_for_thread = sql_reuse_in_thread->get();
- if (dbs_for_thread[db_name] == real) {
- m_delete (dbs_for_thread, db_name);
- if (!sizeof (dbs_for_thread)) sql_reuse_in_thread->set (0);
- }
+ if (real && sessiondbs) {
+ Sql.Sql oldsess;
+ if (oldsess = sessiondbs[db_name])
+ sq_cache_free(db_name, oldsess);
+ sessiondbs[db_name] = real;
+ real = 0;
}
+ }
-#ifndef NO_DB_REUSE
- mixed key;
- catch {
- key = sq_cache_lock();
- };
-
-#ifdef DB_DEBUG
- werror("%O:%d added to free list\n", db_name, num );
- m_delete(my_mysql_last_user, num);
-#endif
- if( !--sql_active_list[db_name] )
- m_delete( sql_active_list, db_name );
- sql_free_list[ db_name ] = ({ SQLTimeout(real) }) +
- (sql_free_list[ db_name ]||({}));
- if( `+( 0, @map(values( sql_free_list ),sizeof ) ) > 20 )
- {
-#ifdef DB_DEBUG
- werror("Free list too large. Cleaning.\n" );
-#endif
- clear_connect_to_my_mysql_cache();
- }
-#else
- // Slow'R'us
- call_out(gc,0);
-#endif
+ protected void destroy()
+ {
+ release();
}
protected mixed `[]( string what )
@@ -1612,9 +1666,9 @@ protected class SQLKey
{
case "real": return real;
case "db_name": return db_name;
- case "reuse_in_thread": return reuse_in_thread;
case "query": return query;
case "big_query": return big_query;
+ case "release": return release;
}
return real[what];
}
@@ -1639,32 +1693,29 @@ Thread.MutexKey sq_cache_lock()
protected mapping(program:string) default_db_charsets = ([]);
Sql.Sql sq_cache_get( string db_name,
- void|int reuse_in_thread, void|string charset)
+ void|mapping sessiondbs, void|string charset)
{
Sql.Sql db;
+ array flist;
- if (reuse_in_thread) {
- mapping(string:Sql.Sql) dbs_for_thread = sql_reuse_in_thread->get();
- db = dbs_for_thread && dbs_for_thread[db_name];
+ if (sessiondbs && (db = sessiondbs[db_name])) {
+ m_delete(sessiondbs, db_name);
}
-
- else {
- while(sql_free_list[ db_name ])
+ else if(flist = sql_free_list[ db_name ])
{
#ifdef DB_DEBUG
+ if(db_name!="local:rw" && db_name!="roxen:rw" && db_name!="mysql:rw")
werror("%O found in free list\n", db_name );
#endif
- SQLTimeout res = sql_free_list[db_name][0];
- if( sizeof( sql_free_list[ db_name ] ) > 1)
- sql_free_list[ db_name ] = sql_free_list[db_name][1..];
+ SQLTimeout res = flist[0]->get();
+ int n = sizeof(flist);
+ if(n>1 && !(n==2 && !flist[n-1]))
+ sql_free_list[ db_name ] = flist[1..n-1-!flist[n-1]];
else
m_delete( sql_free_list, db_name );
- if ((db = res && res->get())) {
sql_active_list[db_name]++;
- break;
- }
+ return [object(Sql.Sql)] (object) SQLKey(res, db_name, sessiondbs);
}
- }
if (db) {
if (object master_sql = db->master_sql)
@@ -1679,7 +1730,7 @@ Sql.Sql sq_cache_get( string db_name,
db->set_charset (charset);
}
}
- return [object(Sql.Sql)] (object) SQLKey (db, db_name, reuse_in_thread);
+ return [object(Sql.Sql)] (object) SQLKey (db, db_name, sessiondbs);
}
return 0;
@@ -1696,7 +1747,7 @@ Sql.Sql sq_cache_get( string db_name,
} while (0)
Sql.Sql sq_cache_set( string db_name, Sql.Sql res,
- void|int reuse_in_thread, void|string charset)
+ void|mapping sessiondbs, void|string charset)
// Should only be called with a "virgin" Sql.Sql object that has never
// been used or had its charset changed.
{
@@ -1704,7 +1755,7 @@ Sql.Sql sq_cache_set( string db_name, Sql.Sql res,
{
FIX_CHARSET_FOR_NEW_SQL_CONN (res, charset);
sql_active_list[ db_name ]++;
- return [object(Sql.Sql)] (object) SQLKey( res, db_name, reuse_in_thread);
+ return [object(Sql.Sql)] (object) SQLKey( res, db_name, sessiondbs);
}
}
@@ -1712,7 +1763,7 @@ Sql.Sql sq_cache_set( string db_name, Sql.Sql res,
* avoided by normal users.
*/
Sql.Sql connect_to_my_mysql( string|int ro, void|string db,
- void|int reuse_in_thread, void|string charset)
+ void|mapping sessiondbs, void|string charset)
{
#if 0
#ifdef DB_DEBUG
@@ -1730,14 +1781,14 @@ Sql.Sql connect_to_my_mysql( string|int ro, void|string
db,
return res;
}
string i = db+":"+(intp(ro)?(ro&&"ro")||"rw":ro);
- Sql.Sql res = sq_cache_get(i, reuse_in_thread, charset);
+ Sql.Sql res = sq_cache_get(i, sessiondbs, charset);
if (res) return res;
destruct(key);
if (res = low_connect_to_my_mysql( ro, db )) {
key = sq_cache_lock();
// Fool the optimizer so that key is not released prematurely
if( res )
- return sq_cache_set(i, res, reuse_in_thread, charset);
+ return sq_cache_set(i, res, sessiondbs, charset);
}
return 0;
}
diff --git a/server/etc/modules/DBManager.pmod
b/server/etc/modules/DBManager.pmod
index c3f730d..6a699c6 100644
--- a/server/etc/modules/DBManager.pmod
+++ b/server/etc/modules/DBManager.pmod
@@ -388,20 +388,20 @@ private
mapping(string:mapping(string:string)) sql_url_cache = ([]);
};
-Sql.Sql low_get( string user, string db, void|int reuse_in_thread,
+Sql.Sql low_get( string user, string db, void|mapping sessiondbs,
void|string charset)
{
if( !user )
return 0;
#ifdef MODULE_DEBUG
- if (!reuse_in_thread)
+ if (!sessiondbs)
if (mapping(string:TableLockInfo) dbs = table_locks->get())
if (TableLockInfo lock_info = dbs[db])
werror ("Warning: Another connection was requested to %O "
"in a thread that has locked tables %s.\n"
"It's likely that this will result in a deadlock - "
- "consider using the reuse_in_thread flag.\n",
+ "consider using sessiondbs.\n",
db,
String.implode_nicely (indices (lock_info->locked_for_read &
lock_info->locked_for_write)));
@@ -419,7 +419,7 @@ Sql.Sql low_get( string user, string db, void|int
reuse_in_thread,
}
if( (int)d->local )
- return connect_to_my_mysql( user, db, reuse_in_thread,
+ return connect_to_my_mysql( user, db, sessiondbs,
charset || d->default_charset );
// Otherwise it's a tad more complex...
@@ -428,9 +428,9 @@ Sql.Sql low_get( string user, string db, void|int
reuse_in_thread,
// has, but they are hidden behind an overloaded index operator.
// Thus, we have to fool the typechecker.
return [object(Sql.Sql)](object)
- ROWrapper( sql_cache_get( d->path, reuse_in_thread,
+ ROWrapper( sql_cache_get( d->path, sessiondbs,
charset || d->default_charset) );
- return sql_cache_get( d->path, reuse_in_thread,
+ return sql_cache_get( d->path, sessiondbs,
charset || d->default_charset);
}
@@ -443,12 +443,17 @@ Sql.Sql get_sql_handler(string db_url)
return Sql.Sql(db_url);
}
-Sql.Sql sql_cache_get(string what, void|int reuse_in_thread,
+void sql_session_free(mapping sessiondbs)
+{
+ roxenloader.sq_session_free(sessiondbs);
+}
+
+Sql.Sql sql_cache_get(string what, void|mapping sessiondbs,
void|string charset)
{
Thread.MutexKey key = roxenloader.sq_cache_lock();
string i = replace(what,":",";")+":-";
- Sql.Sql res = roxenloader.sq_cache_get(i, reuse_in_thread, charset);
+ Sql.Sql res = roxenloader.sq_cache_get(i, sessiondbs, charset);
if (res) return res;
// Release the lock during the call to get_sql_handler(),
// since it may take quite a bit of time...
@@ -456,7 +461,7 @@ Sql.Sql sql_cache_get(string what, void|int reuse_in_thread,
if (res = get_sql_handler(what)) {
// Now we need the lock again...
key = roxenloader.sq_cache_lock();
- res = roxenloader.sq_cache_set(i, res, reuse_in_thread, charset);
+ res = roxenloader.sq_cache_set(i, res, sessiondbs, charset);
// Fool the optimizer so that key is not released prematurely
if( res )
return res;
@@ -881,7 +886,7 @@ int is_valid_db_user (string user)
}
Sql.Sql get( string name, void|Configuration conf,
- int|void read_only, void|int reuse_in_thread,
+ int|void read_only, void|mapping sessiondbs,
void|string charset)
//! Returns an SQL connection object for a database named under the
//! "DB" tab in the administration interface.
@@ -898,20 +903,20 @@ Sql.Sql get( string name, void|Configuration conf,
//! connection is also returned if @[conf] is specified and only has
//! read access (regardless of @[read_only]).
//!
-//! @param reuse_in_thread
+//! @param sessiondbs
//! If this is nonzero then the SQL connection is reused within the
-//! current thread. I.e. other calls to this function from this
-//! thread with the same @[name] and @[read_only] and a nonzero
-//! @[reuse_in_thread] will return the same object. However, the
+//! current session. I.e. other calls to this function from this
+//! session with the same @[name] and a nonzero
+//! @[sessiondbs] will return the same object. However, the
//! connection won't be reused while a result object from
//! @[Sql.Sql.big_query] or similar exists.
//!
-//! Using this flag is a good way to cut down on the amount of
+//! Using this method is a good way to cut down on the amount of
//! simultaneous connections, and to avoid deadlocks when
//! transactions or locked tables are used (other problems can occur
//! instead though, if transactions or table locking is done
//! recursively). However, the caller has to ensure that the
-//! connection never becomes in use by another thread. The safest
+//! connection never becomes in use by another session. The safest
//! way to ensure that is to always keep it on the stack, i.e. only
//! assign it to variables declared inside functions or pass it in
//! arguments to functions.
@@ -940,7 +945,7 @@ Sql.Sql get( string name, void|Configuration conf,
//! for that matter) is changed some other way then it must be
//! restored before the connection is released.
{
- return low_get( get_db_user( name, conf, read_only), name, reuse_in_thread,
+ return low_get( get_db_user( name, conf, read_only), name, sessiondbs,
charset);
}
@@ -968,14 +973,11 @@ class MySQLTablesLock
//! o It ensures UNLOCK TABLES always gets executed on exit through
//! the refcount garb strategy (i.e. put it in a local variable
//! just like a @[Thread.MutexKey]).
-//! o It checks that the @[reuse_in_thread] flag was used to
-//! @[DBManager.get] to ensure that a thread doesn't outlock itself
-//! by using different connections.
//!
//! Note that atomic queries and updates don't require
//! @[MySQLTablesLock] stuff even when it's used in other places at
//! the same time. They should however use a connection retrieved with
-//! @[reuse_in_thread] set to avoid deadlocks.
+//! @[sessiondbs] set to avoid deadlocks.
{
protected TableLockInfo lock_info;
@@ -988,8 +990,8 @@ class MySQLTablesLock
{
if (!db->db_name)
error ("db was not retrieved with DBManager.get().\n");
- if (!db->reuse_in_thread)
- error ("db was not retrieved with DBManager.get(x,y,z,1).\n");
+ if (!db->sessiondbs)
+ error ("db was not retrieved with DBManager.get(x,y,z,sessiondbs).\n");
multiset(string) read_tbl = (<>);
foreach (read_tables || ({}), string tbl) {
diff --git a/server/modules/database/sqltag.pike
b/server/modules/database/sqltag.pike
index fdc0eae..71b6c98 100644
--- a/server/modules/database/sqltag.pike
+++ b/server/modules/database/sqltag.pike
@@ -328,8 +328,7 @@ protected string get_restricted_ro_user()
Sql.Sql get_rxml_sql_con (string db, void|string host, void|RequestID id,
- void|int read_only, void|int reuse_in_thread,
- void|string charset)
+ void|int read_only, void|string charset)
//! This function is useful from other modules via the
//! @tt provider interface: It applies the security
//! settings configured in this module to check whether the requested
@@ -349,7 +348,6 @@ Sql.Sql get_rxml_sql_con (string db, void|string host,
void|RequestID id,
//! Note: Optional.
//!
//! @param read_only
-//! @param reuse_in_thread
//! @param charset
//! Passed on to @[DBManager.get] (if called). The default charset
//! configured in this module is used if @[charset] isn't given.
@@ -421,13 +419,18 @@ Sql.Sql get_rxml_sql_con (string db, void|string host,
void|RequestID id,
"from this configuration.\n", db);
}
+ mapping sessiondbs;
+
+ if (id && !(sessiondbs = id->misc->sessiondbs))
+ id->misc->sessiondbs = sessiondbs = ([]);
+
error = catch {
- con = DBManager.low_get (db_user, db, reuse_in_thread, charset);
+ con = DBManager.low_get (db_user, db, sessiondbs, charset);
};
}
if (error || !con) {
-#if 0
+#if 1 // srb
werror (describe_backtrace (error));
#endif
RXML.run_error(error ? describe_error (error) :
@@ -515,7 +518,7 @@ array|Sql.sql_result do_sql_query(mapping args, RequestID id,
}
else
{
- con = get_rxml_sql_con (args->db, host, id, ro, 0, conn_charset);
+ con = get_rxml_sql_con (args->db, host, id, ro, conn_charset);
function query_fn = (big_query ? con->big_query : con->query);
if( error = catch( result = (bindings ? query_fn(args->query, bindings) :
query_fn(args->query))) ) {
@@ -595,6 +598,7 @@ class SqlEmitResponse {
if(sqlres && (val = sqlres->fetch_row()))
fetched++;
else {
+ release();
sqlres = 0;
return 0;
}
@@ -682,6 +686,17 @@ class SqlEmitResponse {
}
}
}
+
+ void release() {
+ object srk;
+ if(sqlres && (srk = sqlres->release)) {
+ srk();
+ }
+ }
+
+ void destroy() {
+ release();
+ }
}
#define GET_CHARSET_AND_ENCODE_QUERY(args, recode_charset) do { \
@@ -848,6 +863,8 @@ class TagSQLTable {
id->misc->defines[" _ok"] = 1;
result=ret;
+ if(res->release)
+ res->release();
return 0;
}
diff --git a/server/modules/tags/rxmltags.pike
b/server/modules/tags/rxmltags.pike
index 25fa6e7..2969bb1 100644
--- a/server/modules/tags/rxmltags.pike
+++ b/server/modules/tags/rxmltags.pike
@@ -5821,6 +5821,8 @@ class TagEmit {
do {
ret += ({ res->get_row() });
} while(ret[-1]!=0);
+ if(res->release)
+ res->release();
return ret[..sizeof(ret)-2];
}
@@ -6101,6 +6103,8 @@ class TagEmit {
}
do_iterate = 0;
+ if(objectp(res) && res->release)
+ res->release();
res = 0;
return 0;
}
diff --git a/server/protocols/http.pike b/server/protocols/http.pike
index 6484285..b4cb198 100644
--- a/server/protocols/http.pike
+++ b/server/protocols/http.pike
@@ -1049,6 +1049,10 @@ protected void cleanup_request_object()
{
if( conf )
conf->connection_drop( this_object() );
+ if( misc->sessiondbs ) {
+ DBManager.sql_session_free(misc->sessiondbs);
+ m_delete(misc, "sessiondbs");
+ }
xml_data = 0;
}
|