summaryrefslogtreecommitdiffstats
path: root/pyload/manager/event
diff options
context:
space:
mode:
Diffstat (limited to 'pyload/manager/event')
-rw-r--r--pyload/manager/event/PullEvents.py120
-rw-r--r--pyload/manager/event/Scheduler.py141
-rw-r--r--pyload/manager/event/__init__.py0
3 files changed, 261 insertions, 0 deletions
diff --git a/pyload/manager/event/PullEvents.py b/pyload/manager/event/PullEvents.py
new file mode 100644
index 000000000..0739b4ec8
--- /dev/null
+++ b/pyload/manager/event/PullEvents.py
@@ -0,0 +1,120 @@
+# -*- coding: utf-8 -*-
+
+"""
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; either version 3 of the License,
+ or (at your option) any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
+ See the GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, see <http://www.gnu.org/licenses/>.
+
+ @author: mkaay
+"""
+
+from time import time
+from pyload.utils import uniqify
+
+class PullManager:
+ def __init__(self, core):
+ self.core = core
+ self.clients = []
+
+ def newClient(self, uuid):
+ self.clients.append(Client(uuid))
+
+ def clean(self):
+ for n, client in enumerate(self.clients):
+ if client.lastActive + 30 < time():
+ del self.clients[n]
+
+ def getEvents(self, uuid):
+ events = []
+ validUuid = False
+ for client in self.clients:
+ if client.uuid == uuid:
+ client.lastActive = time()
+ validUuid = True
+ while client.newEvents():
+ events.append(client.popEvent().toList())
+ break
+ if not validUuid:
+ self.newClient(uuid)
+ events = [ReloadAllEvent("queue").toList(), ReloadAllEvent("collector").toList()]
+ return uniqify(events)
+
+ def addEvent(self, event):
+ for client in self.clients:
+ client.addEvent(event)
+
+class Client:
+ def __init__(self, uuid):
+ self.uuid = uuid
+ self.lastActive = time()
+ self.events = []
+
+ def newEvents(self):
+ return len(self.events) > 0
+
+ def popEvent(self):
+ if not len(self.events):
+ return None
+ return self.events.pop(0)
+
+ def addEvent(self, event):
+ self.events.append(event)
+
+class UpdateEvent:
+ def __init__(self, itype, iid, destination):
+ assert itype == "pack" or itype == "file"
+ assert destination == "queue" or destination == "collector"
+ self.type = itype
+ self.id = iid
+ self.destination = destination
+
+ def toList(self):
+ return ["update", self.destination, self.type, self.id]
+
+class RemoveEvent:
+ def __init__(self, itype, iid, destination):
+ assert itype == "pack" or itype == "file"
+ assert destination == "queue" or destination == "collector"
+ self.type = itype
+ self.id = iid
+ self.destination = destination
+
+ def toList(self):
+ return ["remove", self.destination, self.type, self.id]
+
+class InsertEvent:
+ def __init__(self, itype, iid, after, destination):
+ assert itype == "pack" or itype == "file"
+ assert destination == "queue" or destination == "collector"
+ self.type = itype
+ self.id = iid
+ self.after = after
+ self.destination = destination
+
+ def toList(self):
+ return ["insert", self.destination, self.type, self.id, self.after]
+
+class ReloadAllEvent:
+ def __init__(self, destination):
+ assert destination == "queue" or destination == "collector"
+ self.destination = destination
+
+ def toList(self):
+ return ["reload", self.destination]
+
+class AccountUpdateEvent:
+ def toList(self):
+ return ["account"]
+
+class ConfigUpdateEvent:
+ def toList(self):
+ return ["config"]
diff --git a/pyload/manager/event/Scheduler.py b/pyload/manager/event/Scheduler.py
new file mode 100644
index 000000000..71b5f96af
--- /dev/null
+++ b/pyload/manager/event/Scheduler.py
@@ -0,0 +1,141 @@
+# -*- coding: utf-8 -*-
+
+"""
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; either version 3 of the License,
+ or (at your option) any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
+ See the GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, see <http://www.gnu.org/licenses/>.
+
+ @author: mkaay
+"""
+
+from time import time
+from heapq import heappop, heappush
+from thread import start_new_thread
+from threading import Lock
+
+class AlreadyCalled(Exception):
+ pass
+
+
+class Deferred:
+ def __init__(self):
+ self.call = []
+ self.result = ()
+
+ def addCallback(self, f, *cargs, **ckwargs):
+ self.call.append((f, cargs, ckwargs))
+
+ def callback(self, *args, **kwargs):
+ if self.result:
+ raise AlreadyCalled
+ self.result = (args, kwargs)
+ for f, cargs, ckwargs in self.call:
+ args += tuple(cargs)
+ kwargs.update(ckwargs)
+ f(*args ** kwargs)
+
+
+class Scheduler:
+ def __init__(self, core):
+ self.core = core
+
+ self.queue = PriorityQueue()
+
+ def addJob(self, t, call, args=[], kwargs={}, threaded=True):
+ d = Deferred()
+ t += time()
+ j = Job(t, call, args, kwargs, d, threaded)
+ self.queue.put((t, j))
+ return d
+
+
+ def removeJob(self, d):
+ """
+ :param d: defered object
+ :return: if job was deleted
+ """
+ index = -1
+
+ for i, j in enumerate(self.queue):
+ if j[1].deferred == d:
+ index = i
+
+ if index >= 0:
+ del self.queue[index]
+ return True
+
+ return False
+
+ def work(self):
+ while True:
+ t, j = self.queue.get()
+ if not j:
+ break
+ else:
+ if t <= time():
+ j.start()
+ else:
+ self.queue.put((t, j))
+ break
+
+
+class Job:
+ def __init__(self, time, call, args=[], kwargs={}, deferred=None, threaded=True):
+ self.time = float(time)
+ self.call = call
+ self.args = args
+ self.kwargs = kwargs
+ self.deferred = deferred
+ self.threaded = threaded
+
+ def run(self):
+ ret = self.call(*self.args, **self.kwargs)
+ if self.deferred is None:
+ return
+ else:
+ self.deferred.callback(ret)
+
+ def start(self):
+ if self.threaded:
+ start_new_thread(self.run, ())
+ else:
+ self.run()
+
+
+class PriorityQueue:
+ """ a non blocking priority queue """
+
+ def __init__(self):
+ self.queue = []
+ self.lock = Lock()
+
+ def __iter__(self):
+ return iter(self.queue)
+
+ def __delitem__(self, key):
+ del self.queue[key]
+
+ def put(self, element):
+ self.lock.acquire()
+ heappush(self.queue, element)
+ self.lock.release()
+
+ def get(self):
+ """ return element or None """
+ self.lock.acquire()
+ try:
+ el = heappop(self.queue)
+ return el
+ except IndexError:
+ return None, None
+ finally:
+ self.lock.release()
diff --git a/pyload/manager/event/__init__.py b/pyload/manager/event/__init__.py
new file mode 100644
index 000000000..e69de29bb
--- /dev/null
+++ b/pyload/manager/event/__init__.py