import tornadis
from redis import RedisError
from json import loads, dumps
import datetime
class Redis:
def __init__(self, *args, **kwds):
self._client = tornadis.Client(*args, **kwds)
# SERVER INFORMATION
async def bgrewriteaof(self):
"Tell the Redis server to rewrite the AOF file from data in memory."
return await self._client.call('BGREWRITEAOF')
async def bgsave(self):
"""
Tell the Redis server to save its data to disk. Unlike save(),
this method is asynchronous and return awaits immediately.
"""
return await self._client.call('BGSAVE')
async def client_kill(self, address):
"Disconnects the client at ``address`` (ip:port)"
return await self._client.call('CLIENT KILL', address)
async def client_list(self):
"Returns a list of currently connected clients"
return await self._client.call('CLIENT LIST')
async def client_getname(self):
"Returns the current connection name"
return await self._client.call('CLIENT GETNAME')
async def client_setname(self, name):
"Sets the current connection name"
return await self._client.call('CLIENT SETNAME', name)
async def config_get(self, pattern="*"):
"Return a dictionary of configuration based on the ``pattern``"
return await self._client.call('CONFIG GET', pattern)
async def config_set(self, name, value):
"Set config item ``name`` with ``value``"
return await self._client.call('CONFIG SET', name, value)
async def config_resetstat(self):
"Reset runtime statistics"
return await self._client.call('CONFIG RESETSTAT')
async def config_rewrite(self):
"Rewrite config file with the minimal change to reflect running config"
return await self._client.call('CONFIG REWRITE')
async def dbsize(self):
"Returns the number of keys in the current database"
return await self._client.call('DBSIZE')
async def debug_object(self, key):
"Returns version specific meta information about a given key"
return await self._client.call('DEBUG OBJECT', key)
async def echo(self, value):
"Echo the string back from the server"
return await self._client.call('ECHO', value)
async def flushall(self):
"Delete all keys in all databases on the current host"
return await self._client.call('FLUSHALL')
async def flushdb(self):
"Delete all keys in the current database"
return await self._client.call('FLUSHDB')
async def info(self, section=None):
"""
Returns a dictionary containing information about the Redis server
The ``section`` option can be used to select a specific section
of information
The section option is not supported by older versions of Redis Server,
and will generate ResponseError
"""
if section is None:
return await self._client.call('INFO')
else:
return await self._client.call('INFO', section)
async def lastsave(self):
"""
Return a Python datetime object representing the last time the
Redis database was saved to disk
"""
return await self._client.call('LASTSAVE')
async def object(self, infotype, key):
"Return the encoding, idletime, or refcount about the key"
return await self._client.call('OBJECT', infotype, key, infotype=infotype)
async def ping(self):
"Ping the Redis server"
return await self._client.call('PING')
async def save(self):
"""
Tell the Redis server to save its data to disk,
blocking until the save is complete
"""
return await self._client.call('SAVE')
async def sentinel(self, *args):
"Redis Sentinel's SENTINEL command."
warnings.warn(
DeprecationWarning('Use the individual sentinel_* methods'))
async def sentinel_get_master_addr_by_name(self, service_name):
"Returns a (host, port) pair for the given ``service_name``"
return await self._client.call('SENTINEL GET-MASTER-ADDR-BY-NAME',
service_name)
async def sentinel_master(self, service_name):
"Returns a dictionary containing the specified masters state."
return await self._client.call('SENTINEL MASTER', service_name)
async def sentinel_masters(self):
"Returns a list of dictionaries containing each master's state."
return await self._client.call('SENTINEL MASTERS')
async def sentinel_monitor(self, name, ip, port, quorum):
"Add a new master to Sentinel to be monitored"
return await self._client.call('SENTINEL MONITOR', name, ip, port, quorum)
async def sentinel_remove(self, name):
"Remove a master from Sentinel's monitoring"
return await self._client.call('SENTINEL REMOVE', name)
async def sentinel_sentinels(self, service_name):
"Returns a list of sentinels for ``service_name``"
return await self._client.call('SENTINEL SENTINELS', service_name)
async def sentinel_set(self, name, option, value):
"Set Sentinel monitoring parameters for a given master"
return await self._client.call('SENTINEL SET', name, option, value)
async def sentinel_slaves(self, service_name):
"Returns a list of slaves for ``service_name``"
return await self._client.call('SENTINEL SLAVES', service_name)
async def shutdown(self):
"Shutdown the server"
try:
self._client.call('SHUTDOWN')
except ConnectionError:
# a ConnectionError here is expected
return
raise RedisError("SHUTDOWN seems to have failed.")
async def slaveof(self, host=None, port=None):
"""
Set the server to be a replicated slave of the instance identified
by the ``host`` and ``port``. If called without arguments, the
instance is promoted to a master instead.
"""
if host is None and port is None:
return await self._client.call('SLAVEOF', Token('NO'), Token('ONE'))
return await self._client.call('SLAVEOF', host, port)
async def slowlog_get(self, num=None):
"""
Get the entries from the slowlog. If ``num`` is specified, get the
most recent ``num`` items.
"""
args = ['SLOWLOG GET']
if num is not None:
args.append(num)
return await self._client.call(*args)
async def slowlog_len(self):
"Get the number of items in the slowlog"
return await self._client.call('SLOWLOG LEN')
async def slowlog_reset(self):
"Remove all items in the slowlog"
return await self._client.call('SLOWLOG RESET')
async def time(self):
"""
Returns the server time as a 2-item tuple of ints:
(seconds since epoch, microseconds into this second).
"""
return await self._client.call('TIME')
# BASIC KEY COMMANDS
async def append(self, key, value):
"""
Appends the string ``value`` to the value at ``key``. If ``key``
doesn't already exist, create it with a value of ``value``.
Returns the new length of the value at ``key``.
"""
return await self._client.call('APPEND', key, value)
async def bitcount(self, key, start=None, end=None):
"""
Returns the count of set bits in the value of ``key``. Optional
``start`` and ``end`` paramaters indicate which bytes to consider
"""
params = [key]
if start is not None and end is not None:
params.append(start)
params.append(end)
elif (start is not None and end is None) or \
(end is not None and start is None):
raise RedisError("Both start and end must be specified")
return await self._client.call('BITCOUNT', *params)
async def bitop(self, operation, dest, *keys):
"""
Perform a bitwise operation using ``operation`` between ``keys`` and
store the result in ``dest``.
"""
return await self._client.call('BITOP', operation, dest, *keys)
async def bitpos(self, key, bit, start=None, end=None):
"""
Return the position of the first bit set to 1 or 0 in a string.
``start`` and ``end`` difines search range. The range is interpreted
as a range of bytes and not a range of bits, so start=0 and end=2
means to look at the first three bytes.
"""
if bit not in (0, 1):
raise RedisError('bit must be 0 or 1')
params = [key, bit]
start is not None and params.append(start)
if start is not None and end is not None:
params.append(end)
elif start is None and end is not None:
raise RedisError("start argument is not set, "
"when end is specified")
return await self._client.call('BITPOS', *params)
async def decr(self, name, amount=1):
"""
Decrements the value of ``key`` by ``amount``. If no key exists,
the value will be initialized as 0 - ``amount``
"""
return await self._client.call('DECRBY', name, amount)
async def delete(self, *names):
"Delete one or more keys specified by ``names``"
return await self._client.call('DEL', *names)
async def __delitem__(self, name):
self.delete(name)
async def dump(self, name):
"""
Return a serialized version of the value stored at the specified key.
If key does not exist a nil bulk reply is return awaited.
"""
return await self._client.call('DUMP', name)
async def exists(self, name):
"Returns a boolean indicating whether key ``name`` exists"
return await self._client.call('EXISTS', name)
__contains__ = exists
async def expire(self, name, time):
"""
Set an expire flag on key ``name`` for ``time`` seconds. ``time``
can be represented by an integer or a Python timedelta object.
"""
if isinstance(time, datetime.timedelta):
time = time.seconds + time.days * 24 * 3600
return await self._client.call('EXPIRE', name, time)
async def expireat(self, name, when):
"""
Set an expire flag on key ``name``. ``when`` can be represented
as an integer indicating unix time or a Python datetime object.
"""
if isinstance(when, datetime.datetime):
when = int(mod_time.mktime(when.timetuple()))
return await self._client.call('EXPIREAT', name, when)
async def get_json(self, name):
r = await self.get(name)
if r:
r = r.decode('utf-8')
r = loads(r)
return r
async def get(self, name):
"""
Return the value at key ``name``, or None if the key doesn't exist
"""
return await self._client.call('GET', name)
async def __getitem__(self, name):
"""
Return the value at key ``name``, raises a KeyError if the key
doesn't exist.
"""
value = self.get(name)
if value:
return await value
raise KeyError(name)
async def getbit(self, name, offset):
"Returns a boolean indicating the value of ``offset`` in ``name``"
return await self._client.call('GETBIT', name, offset)
async def getrange(self, key, start, end):
"""
Returns the substring of the string value stored at ``key``,
determined by the offsets ``start`` and ``end`` (both are inclusive)
"""
return await self._client.call('GETRANGE', key, start, end)
async def getset(self, name, value):
"""
Sets the value at key ``name`` to ``value``
and return awaits the old value at key ``name`` atomically.
"""
return await self._client.call('GETSET', name, value)
async def incr(self, name, amount=1):
"""
Increments the value of ``key`` by ``amount``. If no key exists,
the value will be initialized as ``amount``
"""
return await self._client.call('INCRBY', name, amount)
async def incrby(self, name, amount=1):
"""
Increments the value of ``key`` by ``amount``. If no key exists,
the value will be initialized as ``amount``
"""
# An alias for ``incr()``, because it is already implemented
# as INCRBY redis command.
return await self.incr(name, amount)
async def incrbyfloat(self, name, amount=1.0):
"""
Increments the value at key ``name`` by floating ``amount``.
If no key exists, the value will be initialized as ``amount``
"""
return await self._client.call('INCRBYFLOAT', name, amount)
async def keys(self, pattern='*'):
"Returns a list of keys matching ``pattern``"
return await self._client.call('KEYS', pattern)
async def mget(self, keys, *args):
"""
Returns a list of values ordered identically to ``keys``
"""
args = list_or_args(keys, args)
return await self._client.call('MGET', *args)
async def mset(self, *args, **kwargs):
"""
Sets key/values based on a mapping. Mapping can be supplied as a single
dictionary argument or as kwargs.
"""
if args:
if len(args) != 1 or not isinstance(args[0], dict):
raise RedisError('MSET requires **kwargs or a single dict arg')
kwargs.update(args[0])
items = []
for pair in iteritems(kwargs):
items.extend(pair)
return await self._client.call('MSET', *items)
async def msetnx(self, *args, **kwargs):
"""
Sets key/values based on a mapping if none of the keys are already set.
Mapping can be supplied as a single dictionary argument or as kwargs.
Returns a boolean indicating if the operation was successful.
"""
if args:
if len(args) != 1 or not isinstance(args[0], dict):
raise RedisError('MSETNX requires **kwargs or a single '
'dict arg')
kwargs.update(args[0])
items = []
for pair in iteritems(kwargs):
items.extend(pair)
return await self._client.call('MSETNX', *items)
async def move(self, name, db):
"Moves the key ``name`` to a different Redis database ``db``"
return await self._client.call('MOVE', name, db)
async def persist(self, name):
"Removes an expiration on ``name``"
return await self._client.call('PERSIST', name)
async def pexpire(self, name, time):
"""
Set an expire flag on key ``name`` for ``time`` milliseconds.
``time`` can be represented by an integer or a Python timedelta
object.
"""
if isinstance(time, datetime.timedelta):
ms = int(time.microseconds / 1000)
time = (time.seconds + time.days * 24 * 3600) * 1000 + ms
return await self._client.call('PEXPIRE', name, time)
async def pexpireat(self, name, when):
"""
Set an expire flag on key ``name``. ``when`` can be represented
as an integer representing unix time in milliseconds (unix time * 1000)
or a Python datetime object.
"""
if isinstance(when, datetime.datetime):
ms = int(when.microsecond / 1000)
when = int(mod_time.mktime(when.timetuple())) * 1000 + ms
return await self._client.call('PEXPIREAT', name, when)
async def psetex(self, name, time_ms, value):
"""
Set the value of key ``name`` to ``value`` that expires in ``time_ms``
milliseconds. ``time_ms`` can be represented by an integer or a Python
timedelta object
"""
if isinstance(time_ms, datetime.timedelta):
ms = int(time_ms.microseconds / 1000)
time_ms = (time_ms.seconds + time_ms.days * 24 * 3600) * 1000 + ms
return await self._client.call('PSETEX', name, time_ms, value)
async def pttl(self, name):
"Returns the number of milliseconds until the key ``name`` will expire"
return await self._client.call('PTTL', name)
async def randomkey(self):
"Returns the name of a random key"
return await self._client.call('RANDOMKEY')
async def rename(self, src, dst):
"""
Rename key ``src`` to ``dst``
"""
return await self._client.call('RENAME', src, dst)
async def renamenx(self, src, dst):
"Rename key ``src`` to ``dst`` if ``dst`` doesn't already exist"
return await self._client.call('RENAMENX', src, dst)
async def restore(self, name, ttl, value):
"""
Create a key using the provided serialized value, previously obtained
using DUMP.
"""
return await self._client.call('RESTORE', name, ttl, value)
async def set_json(self, name, value, ex=None, px=None, nx=False, xx=False):
return await self.set(name,dumps(value,ensure_ascii=False),ex,px,nx,xx)
async def set(self, name, value, ex=None, px=None, nx=False, xx=False):
"""
Set the value at key ``name`` to ``value``
``ex`` sets an expire flag on key ``name`` for ``ex`` seconds.
``px`` sets an expire flag on key ``name`` for ``px`` milliseconds.
``nx`` if set to True, set the value at key ``name`` to ``value`` if it
does not already exist.
``xx`` if set to True, set the value at key ``name`` to ``value`` if it
already exists.
"""
pieces = [name, value]
if ex:
pieces.append('EX')
if isinstance(ex, datetime.timedelta):
ex = ex.seconds + ex.days * 24 * 3600
pieces.append(ex)
if px:
pieces.append('PX')
if isinstance(px, datetime.timedelta):
ms = int(px.microseconds / 1000)
px = (px.seconds + px.days * 24 * 3600) * 1000 + ms
pieces.append(px)
if nx:
pieces.append('NX')
if xx:
pieces.append('XX')
return await self._client.call('SET', *pieces)
async def __setitem__(self, name, value):
self.set(name, value)
async def setbit(self, name, offset, value):
"""
Flag the ``offset`` in ``name`` as ``value``. Returns a boolean
indicating the previous value of ``offset``.
"""
value = value and 1 or 0
return await self._client.call('SETBIT', name, offset, value)
async def setex(self, name, time, value):
"""
Set the value of key ``name`` to ``value`` that expires in ``time``
seconds. ``time`` can be represented by an integer or a Python
timedelta object.
"""
if isinstance(time, datetime.timedelta):
time = time.seconds + time.days * 24 * 3600
return await self._client.call('SETEX', name, time, value)
async def setnx(self, name, value):
"Set the value of key ``name`` to ``value`` if key doesn't exist"
return await self._client.call('SETNX', name, value)
async def setrange(self, name, offset, value):
"""
Overwrite bytes in the value of ``name`` starting at ``offset`` with
``value``. If ``offset`` plus the length of ``value`` exceeds the
length of the original value, the new value will be larger than before.
If ``offset`` exceeds the length of the original value, null bytes
will be used to pad between the end of the previous value and the start
of what's being injected.
Returns the length of the new string.
"""
return await self._client.call('SETRANGE', name, offset, value)
async def strlen(self, name):
"Return the number of bytes stored in the value of ``name``"
return await self._client.call('STRLEN', name)
async def substr(self, name, start, end=-1):
"""
Return a substring of the string at key ``name``. ``start`` and ``end``
are 0-based integers specifying the portion of the string to return await.
"""
return await self._client.call('SUBSTR', name, start, end)
async def ttl(self, name):
"Returns the number of seconds until the key ``name`` will expire"
return await self._client.call('TTL', name)
async def type(self, name):
"Returns the type of key ``name``"
return await self._client.call('TYPE', name)
async def watch(self, *names):
"""
Watches the values at keys ``names``, or None if the key doesn't exist
"""
warnings.warn(DeprecationWarning('Call WATCH from a Pipeline object'))
async def unwatch(self):
"""
Unwatches the value at key ``name``, or None of the key doesn't exist
"""
warnings.warn(
DeprecationWarning('Call UNWATCH from a Pipeline object'))
# LIST COMMANDS
async def blpop(self, keys, timeout=0):
"""
LPOP a value off of the first non-empty list
named in the ``keys`` list.
If none of the lists in ``keys`` has a value to LPOP, then block
for ``timeout`` seconds, or until a value gets pushed on to one
of the lists.
If timeout is 0, then block indefinitely.
"""
if timeout is None:
timeout = 0
if isinstance(keys, basestring):
keys = [keys]
else:
keys = list(keys)
keys.append(timeout)
return await self._client.call('BLPOP', *keys)
async def brpop(self, keys, timeout=0):
"""
RPOP a value off of the first non-empty list
named in the ``keys`` list.
If none of the lists in ``keys`` has a value to LPOP, then block
for ``timeout`` seconds, or until a value gets pushed on to one
of the lists.
If timeout is 0, then block indefinitely.
"""
if timeout is None:
timeout = 0
if isinstance(keys, basestring):
keys = [keys]
else:
keys = list(keys)
keys.append(timeout)
return await self._client.call('BRPOP', *keys)
async def brpoplpush(self, src, dst, timeout=0):
"""
Pop a value off the tail of ``src``, push it on the head of ``dst``
and then return await it.
This command blocks until a value is in ``src`` or until ``timeout``
seconds elapse, whichever is first. A ``timeout`` value of 0 blocks
forever.
"""
if timeout is None:
timeout = 0
return await self._client.call('BRPOPLPUSH', src, dst, timeout)
async def lindex(self, name, index):
"""
Return the item from list ``name`` at position ``index``
Negative indexes are supported and will return await an item at the
end of the list
"""
return await self._client.call('LINDEX', name, index)
async def linsert(self, name, where, refvalue, value):
"""
Insert ``value`` in list ``name`` either immediately before or after
[``where``] ``refvalue``
Returns the new length of the list on success or -1 if ``refvalue``
is not in the list.
"""
return await self._client.call('LINSERT', name, where, refvalue, value)
async def llen(self, name):
"Return the length of the list ``name``"
return await self._client.call('LLEN', name)
async def lpop(self, name):
"Remove and return await the first item of the list ``name``"
return await self._client.call('LPOP', name)
async def lpush(self, name, *values):
"Push ``values`` onto the head of the list ``name``"
return await self._client.call('LPUSH', name, *values)
async def lpushx(self, name, value):
"Push ``value`` onto the head of the list ``name`` if ``name`` exists"
return await self._client.call('LPUSHX', name, value)
async def lrange(self, name, start, end):
"""
Return a slice of the list ``name`` between
position ``start`` and ``end``
``start`` and ``end`` can be negative numbers just like
Python slicing notation
"""
return await self._client.call('LRANGE', name, start, end)
async def lrem(self, name, count, value):
"""
Remove the first ``count`` occurrences of elements equal to ``value``
from the list stored at ``name``.
The count argument influences the operation in the following ways:
count > 0: Remove elements equal to value moving from head to tail.
count < 0: Remove elements equal to value moving from tail to head.
count = 0: Remove all elements equal to value.
"""
return await self._client.call('LREM', name, count, value)
async def lset(self, name, index, value):
"Set ``position`` of list ``name`` to ``value``"
return await self._client.call('LSET', name, index, value)
async def ltrim(self, name, start, end):
"""
Trim the list ``name``, removing all values not within the slice
between ``start`` and ``end``
``start`` and ``end`` can be negative numbers just like
Python slicing notation
"""
return await self._client.call('LTRIM', name, start, end)
async def rpop(self, name):
"Remove and return await the last item of the list ``name``"
return await self._client.call('RPOP', name)
async def rpoplpush(self, src, dst):
"""
RPOP a value off of the ``src`` list and atomically LPUSH it
on to the ``dst`` list. Returns the value.
"""
return await self._client.call('RPOPLPUSH', src, dst)
async def rpush(self, name, *values):
"Push ``values`` onto the tail of the list ``name``"
return await self._client.call('RPUSH', name, *values)
async def rpushx(self, name, value):
"Push ``value`` onto the tail of the list ``name`` if ``name`` exists"
return await self._client.call('RPUSHX', name, value)
async def sort(self, name, start=None, num=None, by=None, get=None,
desc=False, alpha=False, store=None, groups=False):
"""
Sort and return await the list, set or sorted set at ``name``.
``start`` and ``num`` allow for paging through the sorted data
``by`` allows using an external key to weight and sort the items.
Use an "*" to indicate where in the key the item value is located
``get`` allows for return awaiting items from external keys rather than the
sorted data itself. Use an "*" to indicate where int he key
the item value is located
``desc`` allows for reversing the sort
``alpha`` allows for sorting lexicographically rather than numerically
``store`` allows for storing the result of the sort into
the key ``store``
``groups`` if set to True and if ``get`` contains at least two
elements, sort will return await a list of tuples, each containing the
values fetched from the arguments to ``get``.
"""
if (start is not None and num is None) or \
(num is not None and start is None):
raise RedisError("``start`` and ``num`` must both be specified")
pieces = [name]
if by is not None:
pieces.append(Token('BY'))
pieces.append(by)
if start is not None and num is not None:
pieces.append(Token('LIMIT'))
pieces.append(start)
pieces.append(num)
if get is not None:
# If get is a string assume we want to get a single value.
# Otherwise assume it's an interable and we want to get multiple
# values. We can't just iterate blindly because strings are
# iterable.
if isinstance(get, basestring):
pieces.append(Token('GET'))
pieces.append(get)
else:
for g in get:
pieces.append(Token('GET'))
pieces.append(g)
if desc:
pieces.append(Token('DESC'))
if alpha:
pieces.append(Token('ALPHA'))
if store is not None:
pieces.append(Token('STORE'))
pieces.append(store)
if groups:
if not get or isinstance(get, basestring) or len(get) < 2:
raise DataError('when using "groups" the "get" argument '
'must be specified and contain at least '
'two keys')
options = {'groups': len(get) if groups else None}
return await self._client.call('SORT', *pieces, **options)
# SCAN COMMANDS
async def scan(self, cursor=0, match=None, count=None):
"""
Incrementally return await lists of key names. Also return await a cursor
indicating the scan position.
``match`` allows for filtering the keys by pattern
``count`` allows for hint the minimum number of return awaits
"""
pieces = [cursor]
if match is not None:
pieces.extend([Token('MATCH'), match])
if count is not None:
pieces.extend([Token('COUNT'), count])
return await self._client.call('SCAN', *pieces)
#async def scan_iter(self, match=None, count=None):
# """
# Make an iterator using the SCAN command so that the client doesn't
# need to remember the cursor position.
# ``match`` allows for filtering the keys by pattern
# ``count`` allows for hint the minimum number of return awaits
# """
# cursor = '0'
# while cursor != 0:
# cursor, data = self.scan(cursor=cursor, match=match, count=count)
# for item in data:
# yield item
async def sscan(self, name, cursor=0, match=None, count=None):
"""
Incrementally return await lists of elements in a set. Also return await a cursor
indicating the scan position.
``match`` allows for filtering the keys by pattern
``count`` allows for hint the minimum number of return awaits
"""
pieces = [name, cursor]
if match is not None:
pieces.extend([Token('MATCH'), match])
if count is not None:
pieces.extend([Token('COUNT'), count])
return await self._client.call('SSCAN', *pieces)
# async def sscan_iter(self, name, match=None, count=None):
# """
# Make an iterator using the SSCAN command so that the client doesn't
# need to remember the cursor position.
# ``match`` allows for filtering the keys by pattern
# ``count`` allows for hint the minimum number of return awaits
# """
# cursor = '0'
# while cursor != 0:
# cursor, data = self.sscan(name, cursor=cursor,
# match=match, count=count)
# for item in data:
# yield item
async def hscan(self, name, cursor=0, match=None, count=None):
"""
Incrementally return await key/value slices in a hash. Also return await a cursor
indicating the scan position.
``match`` allows for filtering the keys by pattern
``count`` allows for hint the minimum number of return awaits
"""
pieces = [name, cursor]
if match is not None:
pieces.extend([Token('MATCH'), match])
if count is not None:
pieces.extend([Token('COUNT'), count])
return await self._client.call('HSCAN', *pieces)
# async def hscan_iter(self, name, match=None, count=None):
# """
# Make an iterator using the HSCAN command so that the client doesn't
# need to remember the cursor position.
# ``match`` allows for filtering the keys by pattern
# ``count`` allows for hint the minimum number of return awaits
# """
# cursor = '0'
# while cursor != 0:
# cursor, data = self.hscan(name, cursor=cursor,
# match=match, count=count)
# for item in data.items():
# yield item
async def zscan(self, name, cursor=0, match=None, count=None,
score_cast_func=float):
"""
Incrementally return await lists of elements in a sorted set. Also return await a
cursor indicating the scan position.
``match`` allows for filtering the keys by pattern
``count`` allows for hint the minimum number of return awaits
``score_cast_func`` a callable used to cast the score return await value
"""
pieces = [name, cursor]
if match is not None:
pieces.extend([Token('MATCH'), match])
if count is not None:
pieces.extend([Token('COUNT'), count])
options = {'score_cast_func': score_cast_func}
return await self._client.call('ZSCAN', *pieces, **options)
# async def zscan_iter(self, name, match=None, count=None,
# score_cast_func=float):
# """
# Make an iterator using the ZSCAN command so that the client doesn't
# need to remember the cursor position.
# ``match`` allows for filtering the keys by pattern
# ``count`` allows for hint the minimum number of return awaits
# ``score_cast_func`` a callable used to cast the score return await value
# """
# cursor = '0'
# while cursor != 0:
# cursor, data = self.zscan(name, cursor=cursor, match=match,
# count=count,
# score_cast_func=score_cast_func)
# for item in data:
# yield item
# SET COMMANDS
async def sadd(self, name, *values):
"Add ``value(s)`` to set ``name``"
return await self._client.call('SADD', name, *values)
async def scard(self, name):
"Return the number of elements in set ``name``"
return await self._client.call('SCARD', name)
async def sdiff(self, keys, *args):
"Return the difference of sets specified by ``keys``"
args = list_or_args(keys, args)
return await self._client.call('SDIFF', *args)
async def sdiffstore(self, dest, keys, *args):
"""
Store the difference of sets specified by ``keys`` into a new
set named ``dest``. Returns the number of keys in the new set.
"""
args = list_or_args(keys, args)
return await self._client.call('SDIFFSTORE', dest, *args)
async def sinter(self, keys, *args):
"Return the intersection of sets specified by ``keys``"
args = list_or_args(keys, args)
return await self._client.call('SINTER', *args)
async def sinterstore(self, dest, keys, *args):
"""
Store the intersection of sets specified by ``keys`` into a new
set named ``dest``. Returns the number of keys in the new set.
"""
args = list_or_args(keys, args)
return await self._client.call('SINTERSTORE', dest, *args)
async def sismember(self, name, value):
"Return a boolean indicating if ``value`` is a member of set ``name``"
return await self._client.call('SISMEMBER', name, value)
async def smembers(self, name):
"Return all members of the set ``name``"
return await self._client.call('SMEMBERS', name)
async def smove(self, src, dst, value):
"Move ``value`` from set ``src`` to set ``dst`` atomically"
return await self._client.call('SMOVE', src, dst, value)
async def spop(self, name):
"Remove and return await a random member of set ``name``"
return await self._client.call('SPOP', name)
async def srandmember(self, name, number=None):
"""
If ``number`` is None, return awaits a random member of set ``name``.
If ``number`` is supplied, return awaits a list of ``number`` random
memebers of set ``name``. Note this is only available when running
Redis 2.6+.
"""
args = number and [number] or []
return await self._client.call('SRANDMEMBER', name, *args)
async def srem(self, name, *values):
"Remove ``values`` from set ``name``"
return await self._client.call('SREM', name, *values)
async def sunion(self, keys, *args):
"Return the union of sets specified by ``keys``"
args = list_or_args(keys, args)
return await self._client.call('SUNION', *args)
async def sunionstore(self, dest, keys, *args):
"""
Store the union of sets specified by ``keys`` into a new
set named ``dest``. Returns the number of keys in the new set.
"""
args = list_or_args(keys, args)
return await self._client.call('SUNIONSTORE', dest, *args)
# SORTED SET COMMANDS
async def zadd(self, name, *args, **kwargs):
"""
Set any number of score, element-name pairs to the key ``name``. Pairs
can be specified in two ways:
As *args, in the form of: score1, name1, score2, name2, ...
or as **kwargs, in the form of: name1=score1, name2=score2, ...
The following example would add four values to the 'my-key' key:
redis.zadd('my-key', 1.1, 'name1', 2.2, 'name2', name3=3.3, name4=4.4)
"""
pieces = []
if args:
if len(args) % 2 != 0:
raise RedisError("ZADD requires an equal number of "
"values and scores")
pieces.extend(args)
for pair in iteritems(kwargs):
pieces.append(pair[1])
pieces.append(pair[0])
return await self._client.call('ZADD', name, *pieces)
async def zcard(self, name):
"Return the number of elements in the sorted set ``name``"
return await self._client.call('ZCARD', name)
async def zcount(self, name, min, max):
"""
Returns the number of elements in the sorted set at key ``name`` with
a score between ``min`` and ``max``.
"""
return await self._client.call('ZCOUNT', name, min, max)
async def zincrby(self, name, value, amount=1):
"Increment the score of ``value`` in sorted set ``name`` by ``amount``"
return await self._client.call('ZINCRBY', name, amount, value)
async def zinterstore(self, dest, keys, aggregate=None):
"""
Intersect multiple sorted sets specified by ``keys`` into
a new sorted set, ``dest``. Scores in the destination will be
aggregated based on the ``aggregate``, or SUM if none is provided.
"""
return await self._zaggregate('ZINTERSTORE', dest, keys, aggregate)
async def zlexcount(self, name, min, max):
"""
Return the number of items in the sorted set ``name`` between the
lexicographical range ``min`` and ``max``.
"""
return await self._client.call('ZLEXCOUNT', name, min, max)
async def zrangebylex(self, name, min, max, start=None, num=None):
"""
Return the lexicographical range of values from sorted set ``name``
between ``min`` and ``max``.
If ``start`` and ``num`` are specified, then return await a slice of the
range.
"""
if (start is not None and num is None) or \
(num is not None and start is None):
raise RedisError("``start`` and ``num`` must both be specified")
pieces = ['ZRANGEBYLEX', name, min, max]
if start is not None and num is not None:
pieces.extend([Token('LIMIT'), start, num])
return await self._client.call(*pieces)
async def zrangebyscore(self, name, min, max, start=None, num=None,
withscores=False, score_cast_func=float):
"""
Return a range of values from the sorted set ``name`` with scores
between ``min`` and ``max``.
If ``start`` and ``num`` are specified, then return await a slice
of the range.
``withscores`` indicates to return await the scores along with the values.
The return await type is a list of (value, score) pairs
`score_cast_func`` a callable used to cast the score return await value
"""
if (start is not None and num is None) or \
(num is not None and start is None):
raise RedisError("``start`` and ``num`` must both be specified")
pieces = ['ZRANGEBYSCORE', name, min, max]
if start is not None and num is not None:
pieces.extend([Token('LIMIT'), start, num])
if withscores:
pieces.append(Token('WITHSCORES'))
options = {
'withscores': withscores,
'score_cast_func': score_cast_func
}
return await self._client.call(*pieces, **options)
async def zrank(self, name, value):
"""
Returns a 0-based value indicating the rank of ``value`` in sorted set
``name``
"""
return await self._client.call('ZRANK', name, value)
async def zrem(self, name, *values):
"Remove member ``values`` from sorted set ``name``"
return await self._client.call('ZREM', name, *values)
async def zremrangebylex(self, name, min, max):
"""
Remove all elements in the sorted set ``name`` between the
lexicographical range specified by ``min`` and ``max``.
Returns the number of elements removed.
"""
return await self._client.call('ZREMRANGEBYLEX', name, min, max)
async def zremrangebyrank(self, name, min, max):
"""
Remove all elements in the sorted set ``name`` with ranks between
``min`` and ``max``. Values are 0-based, ordered from smallest score
to largest. Values can be negative indicating the highest scores.
Returns the number of elements removed
"""
return await self._client.call('ZREMRANGEBYRANK', name, min, max)
async def zremrangebyscore(self, name, min, max):
"""
Remove all elements in the sorted set ``name`` with scores
between ``min`` and ``max``. Returns the number of elements removed.
"""
return await self._client.call('ZREMRANGEBYSCORE', name, min, max)
async def zrevrange(self, name, start, end, withscores=False,
score_cast_func=float):
return await self._zrange( name, start, end, withscores, score_cast_func, "ZREVRANGE")
async def zrange(self, name, start, end, withscores=False, score_cast_func=float):
return await self._zrange( name, start, end, withscores, score_cast_func, "ZRANGE")
async def _zrange(self, name, start, end, withscores=False,
score_cast_func=float, action="ZREVRANGE"):
"""
Return a range of values from sorted set ``name`` between
``start`` and ``end`` sorted in descending order.
``start`` and ``end`` can be negative, indicating the end of the range.
``withscores`` indicates to return await the scores along with the values
The return await type is a list of (value, score) pairs
``score_cast_func`` a callable used to cast the score return await value
"""
pieces = [action, name, start, end]
if withscores:
pieces.append('WITHSCORES')
r = await self._client.call(*pieces)
if withscores:
t = []
result = []
for pos,i in enumerate(r):
if pos%2:
t.append(score_cast_func(i))
result.append(tuple(t))
t = []
else:
t.append(i)
r = result
return r
async def zrevrangebyscore(self, name, max, min, start=None, num=None,
withscores=False, score_cast_func=float):
"""
Return a range of values from the sorted set ``name`` with scores
between ``min`` and ``max`` in descending order.
If ``start`` and ``num`` are specified, then return await a slice
of the range.
``withscores`` indicates to return await the scores along with the values.
The return await type is a list of (value, score) pairs
``score_cast_func`` a callable used to cast the score return await value
"""
if (start is not None and num is None) or \
(num is not None and start is None):
raise RedisError("``start`` and ``num`` must both be specified")
pieces = ['ZREVRANGEBYSCORE', name, max, min]
if start is not None and num is not None:
pieces.extend([Token('LIMIT'), start, num])
if withscores:
pieces.append(Token('WITHSCORES'))
options = {
'withscores': withscores,
'score_cast_func': score_cast_func
}
return await self._client.call(*pieces, **options)
async def zrevrank(self, name, value):
"""
Returns a 0-based value indicating the descending rank of
``value`` in sorted set ``name``
"""
return await self._client.call('ZREVRANK', name, value)
async def zscore(self, name, value):
"Return the score of element ``value`` in sorted set ``name``"
return await self._client.call('ZSCORE', name, value)
async def zunionstore(self, dest, keys, aggregate=None):
"""
Union multiple sorted sets specified by ``keys`` into
a new sorted set, ``dest``. Scores in the destination will be
aggregated based on the ``aggregate``, or SUM if none is provided.
"""
return await self._zaggregate('ZUNIONSTORE', dest, keys, aggregate)
async def _zaggregate(self, command, dest, keys, aggregate=None):
pieces = [command, dest, len(keys)]
if isinstance(keys, dict):
keys, weights = iterkeys(keys), itervalues(keys)
else:
weights = None
pieces.extend(keys)
if weights:
pieces.append(Token('WEIGHTS'))
pieces.extend(weights)
if aggregate:
pieces.append(Token('AGGREGATE'))
pieces.append(aggregate)
return await self._client.call(*pieces)
# HYPERLOGLOG COMMANDS
async def pfadd(self, name, *values):
"Adds the specified elements to the specified HyperLogLog."
return await self._client.call('PFADD', name, *values)
async def pfcount(self, name):
"""
Return the approximated cardinality of
the set observed by the HyperLogLog at key.
"""
return await self._client.call('PFCOUNT', name)
async def pfmerge(self, dest, *sources):
"Merge N different HyperLogLogs into a single one."
return await self._client.call('PFMERGE', dest, *sources)
# HASH COMMANDS
async def hdel(self, name, *keys):
"Delete ``keys`` from hash ``name``"
return await self._client.call('HDEL', name, *keys)
async def hexists(self, name, key):
"Returns a boolean indicating if ``key`` exists within hash ``name``"
return await self._client.call('HEXISTS', name, key)
async def hget(self, name, key):
"Return the value of ``key`` within the hash ``name``"
return await self._client.call('HGET', name, key)
async def hgetall(self, name):
"Return a Python dict of the hash's name/value pairs"
return await self._client.call('HGETALL', name)
async def hincrby(self, name, key, amount=1):
"Increment the value of ``key`` in hash ``name`` by ``amount``"
return await self._client.call('HINCRBY', name, key, amount)
async def hincrbyfloat(self, name, key, amount=1.0):
"""
Increment the value of ``key`` in hash ``name`` by floating ``amount``
"""
return await self._client.call('HINCRBYFLOAT', name, key, amount)
async def hkeys(self, name):
"Return the list of keys within hash ``name``"
return await self._client.call('HKEYS', name)
async def hlen(self, name):
"Return the number of elements in hash ``name``"
return await self._client.call('HLEN', name)
async def hset(self, name, key, value):
"""
Set ``key`` to ``value`` within hash ``name``
Returns 1 if HSET created a new field, otherwise 0
"""
return await self._client.call('HSET', name, key, value)
async def hsetnx(self, name, key, value):
"""
Set ``key`` to ``value`` within hash ``name`` if ``key`` does not
exist. Returns 1 if HSETNX created a field, otherwise 0.
"""
return await self._client.call('HSETNX', name, key, value)
async def hmset(self, name, mapping):
"""
Set key to value within hash ``name`` for each corresponding
key and value from the ``mapping`` dict.
"""
if not mapping:
raise DataError("'hmset' with 'mapping' of length 0")
items = []
for pair in iteritems(mapping):
items.extend(pair)
return await self._client.call('HMSET', name, *items)
async def hmget(self, name, keys, *args):
"Returns a list of values ordered identically to ``keys``"
args = list_or_args(keys, args)
return await self._client.call('HMGET', name, *args)
async def hvals(self, name):
"Return the list of values within hash ``name``"
return await self._client.call('HVALS', name)
async def publish(self, channel, message):
"""
Publish ``message`` on ``channel``.
Returns the number of subscribers the message was delivered to.
"""
return await self._client.call('PUBLISH', channel, message)
async def eval(self, script, numkeys, *keys_and_args):
"""
Execute the Lua ``script``, specifying the ``numkeys`` the script
will touch and the key names and argument values in ``keys_and_args``.
Returns the result of the script.
In practice, use the object return awaited by ``register_script``. This
function exists purely for Redis API completion.
"""
return await self._client.call('EVAL', script, numkeys, *keys_and_args)
async def evalsha(self, sha, numkeys, *keys_and_args):
"""
Use the ``sha`` to execute a Lua script already registered via EVAL
or SCRIPT LOAD. Specify the ``numkeys`` the script will touch and the
key names and argument values in ``keys_and_args``. Returns the result
of the script.
In practice, use the object return awaited by ``register_script``. This
function exists purely for Redis API completion.
"""
return await self._client.call('EVALSHA', sha, numkeys, *keys_and_args)
async def script_exists(self, *args):
"""
Check if a script exists in the script cache by specifying the SHAs of
each script as ``args``. Returns a list of boolean values indicating if
if each already script exists in the cache.
"""
return await self._client.call('SCRIPT EXISTS', *args)
async def script_flush(self):
"Flush all scripts from the script cache"
return await self._client.call('SCRIPT FLUSH')
async def script_kill(self):
"Kill the currently executing Lua script"
return await self._client.call('SCRIPT KILL')
async def script_load(self, script):
"Load a Lua ``script`` into the script cache. Returns the SHA."
return await self._client.call('SCRIPT LOAD', script)
async def register_script(self, script):
"""
Register a Lua ``script`` specifying the ``keys`` it will touch.
Returns a Script object that is callable and hides the complexity of
deal with scripts, keys, and shas. This is the preferred way to work
with Lua scripts.
"""
return await Script(self, script)