# -*- coding: utf-8 -*- """ This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; either version 3 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program; if not, see . @author: mkaay, RaNaN """ from time import time from traceback import print_exc from threading import Lock class CaptchaManager(): def __init__(self, core): self.lock = Lock() self.core = core self.tasks = [] #task store, for outgoing tasks only self.ids = 0 #only for internal purpose def newTask(self, img, format, file, result_type): task = CaptchaTask(self.ids, img, format, file, result_type) self.ids += 1 return task def removeTask(self, task): self.lock.acquire() if task in self.tasks: self.tasks.remove(task) self.lock.release() def getTask(self): self.lock.acquire() for task in self.tasks: if task.status in ("waiting", "shared-user"): self.lock.release() return task self.lock.release() return None def getTaskByID(self, tid): self.lock.acquire() for task in self.tasks: if task.id == str(tid): #task ids are strings self.lock.release() return task self.lock.release() return None def handleCaptcha(self, task): cli = self.core.isClientConnected() if cli: #client connected -> should solve the captcha task.setWaiting(50) #wait 50 sec for response for plugin in self.core.hookManager.activePlugins(): try: plugin.newCaptchaTask(task) except: if self.core.debug: print_exc() if task.handler or cli: #the captcha was handled self.tasks.append(task) return True task.error = _("No Client connected for captcha decrypting") return False class CaptchaTask(): def __init__(self, id, img, format, file, result_type='textual'): self.id = str(id) self.captchaImg = img self.captchaFormat = format self.captchaFile = file self.captchaResultType = result_type self.handler = [] #the hook plugins that will take care of the solution self.result = None self.waitUntil = None self.error = None #error message self.status = "init" self.data = {} #handler can store data here def getCaptcha(self): return self.captchaImg, self.captchaFormat, self.captchaResultType def setResult(self, text): if self.isTextual(): self.result = text if self.isPositional(): try: parts = text.split(',') self.result = (int(parts[0]), int(parts[1])) except: self.result = None def getResult(self): try: res = self.result.encode("utf8", "replace") except: res = self.result return res def getStatus(self): return self.status def setWaiting(self, sec): """ let the captcha wait secs for the solution """ self.waitUntil = max(time() + sec, self.waitUntil) self.status = "waiting" def isWaiting(self): if self.result or self.error or time() > self.waitUntil: return False return True def isTextual(self): """ returns if text is written on the captcha """ return self.captchaResultType == 'textual' def isPositional(self): """ returns if user have to click a specific region on the captcha """ return self.captchaResultType == 'positional' def setWatingForUser(self, exclusive): if exclusive: self.status = "user" else: self.status = "shared-user" def timedOut(self): return time() > self.waitUntil def invalid(self): """ indicates the captcha was not correct """ [x.captchaInvalid(self) for x in self.handler] def correct(self): [x.captchaCorrect(self) for x in self.handler] def __str__(self): return "" % self.id