summaryrefslogtreecommitdiffstats
path: root/pyload/lib/beaker/util.py
diff options
context:
space:
mode:
Diffstat (limited to 'pyload/lib/beaker/util.py')
-rw-r--r--pyload/lib/beaker/util.py462
1 files changed, 462 insertions, 0 deletions
diff --git a/pyload/lib/beaker/util.py b/pyload/lib/beaker/util.py
new file mode 100644
index 000000000..c7002cd92
--- /dev/null
+++ b/pyload/lib/beaker/util.py
@@ -0,0 +1,462 @@
+"""Beaker utilities"""
+
+try:
+ import thread as _thread
+ import threading as _threading
+except ImportError:
+ import dummy_thread as _thread
+ import dummy_threading as _threading
+
+from datetime import datetime, timedelta
+import os
+import re
+import string
+import types
+import weakref
+import warnings
+import sys
+import inspect
+
+py3k = getattr(sys, 'py3kwarning', False) or sys.version_info >= (3, 0)
+py24 = sys.version_info < (2, 5)
+jython = sys.platform.startswith('java')
+
+if py3k or jython:
+ import pickle
+else:
+ import cPickle as pickle
+
+from beaker.converters import asbool
+from beaker import exceptions
+from threading import local as _tlocal
+
+
+__all__ = ["ThreadLocal", "WeakValuedRegistry", "SyncDict", "encoded_path",
+ "verify_directory"]
+
+
+def function_named(fn, name):
+ """Return a function with a given __name__.
+
+ Will assign to __name__ and return the original function if possible on
+ the Python implementation, otherwise a new function will be constructed.
+
+ """
+ fn.__name__ = name
+ return fn
+
+
+def skip_if(predicate, reason=None):
+ """Skip a test if predicate is true."""
+ reason = reason or predicate.__name__
+
+ from nose import SkipTest
+
+ def decorate(fn):
+ fn_name = fn.__name__
+
+ def maybe(*args, **kw):
+ if predicate():
+ msg = "'%s' skipped: %s" % (
+ fn_name, reason)
+ raise SkipTest(msg)
+ else:
+ return fn(*args, **kw)
+ return function_named(maybe, fn_name)
+ return decorate
+
+
+def assert_raises(except_cls, callable_, *args, **kw):
+ """Assert the given exception is raised by the given function + arguments."""
+
+ try:
+ callable_(*args, **kw)
+ success = False
+ except except_cls:
+ success = True
+
+ # assert outside the block so it works for AssertionError too !
+ assert success, "Callable did not raise an exception"
+
+
+def verify_directory(dir):
+ """verifies and creates a directory. tries to
+ ignore collisions with other threads and processes."""
+
+ tries = 0
+ while not os.access(dir, os.F_OK):
+ try:
+ tries += 1
+ os.makedirs(dir)
+ except:
+ if tries > 5:
+ raise
+
+
+def has_self_arg(func):
+ """Return True if the given function has a 'self' argument."""
+ args = inspect.getargspec(func)
+ if args and args[0] and args[0][0] in ('self', 'cls'):
+ return True
+ else:
+ return False
+
+
+def warn(msg, stacklevel=3):
+ """Issue a warning."""
+ if isinstance(msg, basestring):
+ warnings.warn(msg, exceptions.BeakerWarning, stacklevel=stacklevel)
+ else:
+ warnings.warn(msg, stacklevel=stacklevel)
+
+
+def deprecated(message):
+ def wrapper(fn):
+ def deprecated_method(*args, **kargs):
+ warnings.warn(message, DeprecationWarning, 2)
+ return fn(*args, **kargs)
+ # TODO: use decorator ? functools.wrapper ?
+ deprecated_method.__name__ = fn.__name__
+ deprecated_method.__doc__ = "%s\n\n%s" % (message, fn.__doc__)
+ return deprecated_method
+ return wrapper
+
+
+class ThreadLocal(object):
+ """stores a value on a per-thread basis"""
+
+ __slots__ = '_tlocal'
+
+ def __init__(self):
+ self._tlocal = _tlocal()
+
+ def put(self, value):
+ self._tlocal.value = value
+
+ def has(self):
+ return hasattr(self._tlocal, 'value')
+
+ def get(self, default=None):
+ return getattr(self._tlocal, 'value', default)
+
+ def remove(self):
+ del self._tlocal.value
+
+
+class SyncDict(object):
+ """
+ An efficient/threadsafe singleton map algorithm, a.k.a.
+ "get a value based on this key, and create if not found or not
+ valid" paradigm:
+
+ exists && isvalid ? get : create
+
+ Designed to work with weakref dictionaries to expect items
+ to asynchronously disappear from the dictionary.
+
+ Use python 2.3.3 or greater ! a major bug was just fixed in Nov.
+ 2003 that was driving me nuts with garbage collection/weakrefs in
+ this section.
+
+ """
+ def __init__(self):
+ self.mutex = _thread.allocate_lock()
+ self.dict = {}
+
+ def get(self, key, createfunc, *args, **kwargs):
+ try:
+ if key in self.dict:
+ return self.dict[key]
+ else:
+ return self.sync_get(key, createfunc, *args, **kwargs)
+ except KeyError:
+ return self.sync_get(key, createfunc, *args, **kwargs)
+
+ def sync_get(self, key, createfunc, *args, **kwargs):
+ self.mutex.acquire()
+ try:
+ try:
+ if key in self.dict:
+ return self.dict[key]
+ else:
+ return self._create(key, createfunc, *args, **kwargs)
+ except KeyError:
+ return self._create(key, createfunc, *args, **kwargs)
+ finally:
+ self.mutex.release()
+
+ def _create(self, key, createfunc, *args, **kwargs):
+ self[key] = obj = createfunc(*args, **kwargs)
+ return obj
+
+ def has_key(self, key):
+ return key in self.dict
+
+ def __contains__(self, key):
+ return self.dict.__contains__(key)
+
+ def __getitem__(self, key):
+ return self.dict.__getitem__(key)
+
+ def __setitem__(self, key, value):
+ self.dict.__setitem__(key, value)
+
+ def __delitem__(self, key):
+ return self.dict.__delitem__(key)
+
+ def clear(self):
+ self.dict.clear()
+
+
+class WeakValuedRegistry(SyncDict):
+ def __init__(self):
+ self.mutex = _threading.RLock()
+ self.dict = weakref.WeakValueDictionary()
+
+sha1 = None
+
+
+def encoded_path(root, identifiers, extension=".enc", depth=3,
+ digest_filenames=True):
+
+ """Generate a unique file-accessible path from the given list of
+ identifiers starting at the given root directory."""
+ ident = "_".join(identifiers)
+
+ global sha1
+ if sha1 is None:
+ from beaker.crypto import sha1
+
+ if digest_filenames:
+ if py3k:
+ ident = sha1(ident.encode('utf-8')).hexdigest()
+ else:
+ ident = sha1(ident).hexdigest()
+
+ ident = os.path.basename(ident)
+
+ tokens = []
+ for d in range(1, depth):
+ tokens.append(ident[0:d])
+
+ dir = os.path.join(root, *tokens)
+ verify_directory(dir)
+
+ return os.path.join(dir, ident + extension)
+
+
+def asint(obj):
+ if isinstance(obj, int):
+ return obj
+ elif isinstance(obj, basestring) and re.match(r'^\d+$', obj):
+ return int(obj)
+ else:
+ raise Exception("This is not a proper int")
+
+
+def verify_options(opt, types, error):
+ if not isinstance(opt, types):
+ if not isinstance(types, tuple):
+ types = (types,)
+ coerced = False
+ for typ in types:
+ try:
+ if typ in (list, tuple):
+ opt = [x.strip() for x in opt.split(',')]
+ else:
+ if typ == bool:
+ typ = asbool
+ elif typ == int:
+ typ = asint
+ elif typ in (timedelta, datetime):
+ if not isinstance(opt, typ):
+ raise Exception("%s requires a timedelta type", typ)
+ opt = typ(opt)
+ coerced = True
+ except:
+ pass
+ if coerced:
+ break
+ if not coerced:
+ raise Exception(error)
+ elif isinstance(opt, str) and not opt.strip():
+ raise Exception("Empty strings are invalid for: %s" % error)
+ return opt
+
+
+def verify_rules(params, ruleset):
+ for key, types, message in ruleset:
+ if key in params:
+ params[key] = verify_options(params[key], types, message)
+ return params
+
+
+def coerce_session_params(params):
+ rules = [
+ ('data_dir', (str, types.NoneType), "data_dir must be a string "
+ "referring to a directory."),
+ ('lock_dir', (str, types.NoneType), "lock_dir must be a string referring to a "
+ "directory."),
+ ('type', (str, types.NoneType), "Session type must be a string."),
+ ('cookie_expires', (bool, datetime, timedelta, int), "Cookie expires was "
+ "not a boolean, datetime, int, or timedelta instance."),
+ ('cookie_domain', (str, types.NoneType), "Cookie domain must be a "
+ "string."),
+ ('cookie_path', (str, types.NoneType), "Cookie path must be a "
+ "string."),
+ ('id', (str,), "Session id must be a string."),
+ ('key', (str,), "Session key must be a string."),
+ ('secret', (str, types.NoneType), "Session secret must be a string."),
+ ('validate_key', (str, types.NoneType), "Session encrypt_key must be "
+ "a string."),
+ ('encrypt_key', (str, types.NoneType), "Session validate_key must be "
+ "a string."),
+ ('secure', (bool, types.NoneType), "Session secure must be a boolean."),
+ ('httponly', (bool, types.NoneType), "Session httponly must be a boolean."),
+ ('timeout', (int, types.NoneType), "Session timeout must be an "
+ "integer."),
+ ('auto', (bool, types.NoneType), "Session is created if accessed."),
+ ('webtest_varname', (str, types.NoneType), "Session varname must be "
+ "a string."),
+ ]
+ opts = verify_rules(params, rules)
+ cookie_expires = opts.get('cookie_expires')
+ if cookie_expires and isinstance(cookie_expires, int) and \
+ not isinstance(cookie_expires, bool):
+ opts['cookie_expires'] = timedelta(seconds=cookie_expires)
+ return opts
+
+
+def coerce_cache_params(params):
+ rules = [
+ ('data_dir', (str, types.NoneType), "data_dir must be a string "
+ "referring to a directory."),
+ ('lock_dir', (str, types.NoneType), "lock_dir must be a string referring to a "
+ "directory."),
+ ('type', (str,), "Cache type must be a string."),
+ ('enabled', (bool, types.NoneType), "enabled must be true/false "
+ "if present."),
+ ('expire', (int, types.NoneType), "expire must be an integer representing "
+ "how many seconds the cache is valid for"),
+ ('regions', (list, tuple, types.NoneType), "Regions must be a "
+ "comma seperated list of valid regions"),
+ ('key_length', (int, types.NoneType), "key_length must be an integer "
+ "which indicates the longest a key can be before hashing"),
+ ]
+ return verify_rules(params, rules)
+
+
+def coerce_memcached_behaviors(behaviors):
+ rules = [
+ ('cas', (bool, int), 'cas must be a boolean or an integer'),
+ ('no_block', (bool, int), 'no_block must be a boolean or an integer'),
+ ('receive_timeout', (int,), 'receive_timeout must be an integer'),
+ ('send_timeout', (int,), 'send_timeout must be an integer'),
+ ('ketama_hash', (str,), 'ketama_hash must be a string designating '
+ 'a valid hashing strategy option'),
+ ('_poll_timeout', (int,), '_poll_timeout must be an integer'),
+ ('auto_eject', (bool, int), 'auto_eject must be an integer'),
+ ('retry_timeout', (int,), 'retry_timeout must be an integer'),
+ ('_sort_hosts', (bool, int), '_sort_hosts must be an integer'),
+ ('_io_msg_watermark', (int,), '_io_msg_watermark must be an integer'),
+ ('ketama', (bool, int), 'ketama must be a boolean or an integer'),
+ ('ketama_weighted', (bool, int), 'ketama_weighted must be a boolean or '
+ 'an integer'),
+ ('_io_key_prefetch', (int, bool), '_io_key_prefetch must be a boolean '
+ 'or an integer'),
+ ('_hash_with_prefix_key', (bool, int), '_hash_with_prefix_key must be '
+ 'a boolean or an integer'),
+ ('tcp_nodelay', (bool, int), 'tcp_nodelay must be a boolean or an '
+ 'integer'),
+ ('failure_limit', (int,), 'failure_limit must be an integer'),
+ ('buffer_requests', (bool, int), 'buffer_requests must be a boolean '
+ 'or an integer'),
+ ('_socket_send_size', (int,), '_socket_send_size must be an integer'),
+ ('num_replicas', (int,), 'num_replicas must be an integer'),
+ ('remove_failed', (int,), 'remove_failed must be an integer'),
+ ('_noreply', (bool, int), '_noreply must be a boolean or an integer'),
+ ('_io_bytes_watermark', (int,), '_io_bytes_watermark must be an '
+ 'integer'),
+ ('_socket_recv_size', (int,), '_socket_recv_size must be an integer'),
+ ('distribution', (str,), 'distribution must be a string designating '
+ 'a valid distribution option'),
+ ('connect_timeout', (int,), 'connect_timeout must be an integer'),
+ ('hash', (str,), 'hash must be a string designating a valid hashing '
+ 'option'),
+ ('verify_keys', (bool, int), 'verify_keys must be a boolean or an integer'),
+ ('dead_timeout', (int,), 'dead_timeout must be an integer')
+ ]
+ return verify_rules(behaviors, rules)
+
+
+def parse_cache_config_options(config, include_defaults=True):
+ """Parse configuration options and validate for use with the
+ CacheManager"""
+
+ # Load default cache options
+ if include_defaults:
+ options = dict(type='memory', data_dir=None, expire=None,
+ log_file=None)
+ else:
+ options = {}
+ for key, val in config.iteritems():
+ if key.startswith('beaker.cache.'):
+ options[key[13:]] = val
+ if key.startswith('cache.'):
+ options[key[6:]] = val
+ coerce_cache_params(options)
+
+ # Set cache to enabled if not turned off
+ if 'enabled' not in options and include_defaults:
+ options['enabled'] = True
+
+ # Configure region dict if regions are available
+ regions = options.pop('regions', None)
+ if regions:
+ region_configs = {}
+ for region in regions:
+ if not region: # ensure region name is valid
+ continue
+ # Setup the default cache options
+ region_options = dict(data_dir=options.get('data_dir'),
+ lock_dir=options.get('lock_dir'),
+ type=options.get('type'),
+ enabled=options['enabled'],
+ expire=options.get('expire'),
+ key_length=options.get('key_length', 250))
+ region_prefix = '%s.' % region
+ region_len = len(region_prefix)
+ for key in options.keys():
+ if key.startswith(region_prefix):
+ region_options[key[region_len:]] = options.pop(key)
+ coerce_cache_params(region_options)
+ region_configs[region] = region_options
+ options['cache_regions'] = region_configs
+ return options
+
+
+def parse_memcached_behaviors(config):
+ """Parse behavior options and validate for use with pylibmc
+ client/PylibMCNamespaceManager, or potentially other memcached
+ NamespaceManagers that support behaviors"""
+ behaviors = {}
+
+ for key, val in config.iteritems():
+ if key.startswith('behavior.'):
+ behaviors[key[9:]] = val
+
+ coerce_memcached_behaviors(behaviors)
+ return behaviors
+
+
+def func_namespace(func):
+ """Generates a unique namespace for a function"""
+ kls = None
+ if hasattr(func, 'im_func'):
+ kls = func.im_class
+ func = func.im_func
+
+ if kls:
+ return '%s.%s' % (kls.__module__, kls.__name__)
+ else:
+ return '%s|%s' % (inspect.getsourcefile(func), func.__name__)