OXIESEC PANEL
- Current Dir:
/
/
opt
/
alt
/
python27
/
lib
/
python2.7
/
site-packages
/
postomaat
Server IP: 2a02:4780:11:1084:0:327f:3464:10
Upload:
Create Dir:
Name
Size
Modified
Perms
📁
..
-
01/08/2025 10:42:48 AM
rwxr-xr-x
📄
__init__.py
712 bytes
03/16/2023 02:20:00 PM
rw-r--r--
📄
__init__.pyc
309 bytes
12/18/2024 09:49:00 AM
rw-r--r--
📄
addrcheck.py
4.1 KB
03/16/2023 02:20:00 PM
rw-r--r--
📄
addrcheck.pyc
5.29 KB
12/18/2024 09:49:00 AM
rw-r--r--
📁
bin
-
12/18/2024 09:49:00 AM
rwxr-xr-x
📄
checkLogfileConfig.py
4.76 KB
03/16/2023 02:20:00 PM
rw-r--r--
📄
checkLogfileConfig.pyc
3.28 KB
12/18/2024 09:49:00 AM
rw-r--r--
📁
conf
-
12/18/2024 09:49:00 AM
rwxr-xr-x
📄
core.py
27.75 KB
03/16/2023 02:20:00 PM
rw-r--r--
📄
core.pyc
23.77 KB
12/18/2024 09:49:00 AM
rw-r--r--
📄
daemon.py
5.05 KB
03/16/2023 02:20:00 PM
rw-r--r--
📄
daemon.pyc
4.73 KB
12/18/2024 09:49:00 AM
rw-r--r--
📁
extensions
-
12/18/2024 09:49:00 AM
rwxr-xr-x
📄
funkyconsole.py
2.24 KB
03/16/2023 02:20:00 PM
rw-r--r--
📄
funkyconsole.pyc
2.12 KB
12/18/2024 09:49:00 AM
rw-r--r--
📄
logtools.py
12.22 KB
03/16/2023 02:20:00 PM
rw-r--r--
📄
logtools.pyc
11.35 KB
12/18/2024 09:49:00 AM
rw-r--r--
📁
plugins
-
12/18/2024 09:49:00 AM
rwxr-xr-x
📄
procpool.py
10.58 KB
03/16/2023 02:20:00 PM
rw-r--r--
📄
procpool.pyc
9.79 KB
12/18/2024 09:49:00 AM
rw-r--r--
📄
scansession.py
11.69 KB
03/16/2023 02:20:00 PM
rw-r--r--
📄
scansession.pyc
12.12 KB
12/18/2024 09:49:00 AM
rw-r--r--
📄
service.py
14.48 KB
03/16/2023 02:20:00 PM
rw-r--r--
📄
service.pyc
11.49 KB
12/18/2024 09:49:00 AM
rw-r--r--
📄
shared.py
21.93 KB
03/16/2023 02:20:00 PM
rw-r--r--
📄
shared.pyc
24.04 KB
12/18/2024 09:49:00 AM
rw-r--r--
📄
stats.py
7.29 KB
03/16/2023 02:20:00 PM
rw-r--r--
📄
stats.pyc
7.55 KB
12/18/2024 09:49:00 AM
rw-r--r--
📄
stringencode.py
13.49 KB
03/16/2023 02:20:00 PM
rw-r--r--
📄
stringencode.pyc
10.66 KB
12/18/2024 09:49:00 AM
rw-r--r--
📄
threadpool.py
8.92 KB
03/16/2023 02:20:00 PM
rw-r--r--
📄
threadpool.pyc
8.19 KB
12/18/2024 09:49:00 AM
rw-r--r--
📁
utils
-
12/18/2024 09:49:00 AM
rwxr-xr-x
📄
workaround.py
935 bytes
03/16/2023 02:20:00 PM
rw-r--r--
📄
workaround.pyc
667 bytes
12/18/2024 09:49:00 AM
rw-r--r--
Editing: threadpool.py
Close
# Copyright 2009-2018 Oli Schacher # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # # # import threading import time import weakref try: import queue except ImportError: import Queue as queue import logging import postomaat.core from postomaat.scansession import SessionHandler class ThreadPool(threading.Thread): def __init__(self, controller, minthreads=1, maxthreads=20, queuesize=100): self.workers = [] self.queuesize = queuesize self.tasks = queue.Queue(queuesize) self.minthreads = minthreads self.maxthreads = maxthreads assert self.minthreads > 0 assert self.maxthreads >= self.minthreads self.logger = logging.getLogger('%s.threadpool' % __package__) self.threadlistlock = threading.Lock() self.checkinterval = 10 self.threadcounter = 0 self._stayalive = True self.laststats = 0 self.statinverval = 60 self.controller = weakref.ref(controller) # keep a weak reference to controller threading.Thread.__init__(self) self.name = 'Threadpool' self.daemon = False self.start() @property def stayalive(self): return self._stayalive @stayalive.setter def stayalive(self, value): # threadpool is shut down -> send poison pill to workers if self._stayalive and not value: self._stayalive = False self._send_poison_pills() self._stayalive = value def _send_poison_pills(self): """flood the queue with poison pills to tell all workers to shut down""" for _ in range(self.maxthreads): self.tasks.put_nowait(None) def add_task(self, session): if self._stayalive: self.tasks.put(session) def add_task_from_socket(self, sock, port): """ Consistent interface with procpool Add task to queue compressing the socket which is needed for multiprocessing Args: sock (socket): incoming socket port (int): port where message was received (needed for plugin list) """ task = (postomaat.core.forking_dumps(sock), port) self.add_task(task) def get_task_sessionhandler(self): if self._stayalive: task = self.tasks.get(True) if task is None: return None psock, port = task sock = postomaat.core.forking_load(psock) return SessionHandler(sock, self.controller().config, self.controller().plugin_list_by_port(port)) else: return None def run(self): self.logger.debug('Threadpool initializing. minthreads=%s maxthreads=%s maxqueue=%s checkinterval=%s' % ( self.minthreads, self.maxthreads, self.queuesize, self.checkinterval)) while self._stayalive: curthreads = self.workers numthreads = len(curthreads) # check the minimum boundary requiredminthreads = self.minthreads if numthreads < requiredminthreads: diff = requiredminthreads - numthreads self._add_worker(diff) continue # check the maximum boundary if numthreads > self.maxthreads: diff = numthreads - self.maxthreads self._remove_worker(diff) continue changed = False # ok, we are within the boundaries, now check if we can dynamically # adapt something queuesize = self.tasks.qsize() # if there are more tasks than current number of threads, we try to # increase workload = float(queuesize) / float(numthreads) if workload > 1 and numthreads < self.maxthreads: self._add_worker() numthreads += 1 changed = True if workload < 1 and numthreads > self.minthreads: self._remove_worker() numthreads -= 1 changed = True # log current stats if changed or time.time() - self.laststats > self.statinverval: workerlist = "\n%s" % '\n'.join(map(repr, self.workers)) self.logger.debug('queuesize=%s workload=%.2f workers=%s workerlist=%s' % ( queuesize, workload, numthreads, workerlist)) self.laststats = time.time() time.sleep(self.checkinterval) self.logger.info('Threadpool shut down') def _remove_worker(self, num=1): self.logger.debug('Removing %s workerthread(s)' % num) for bla in range(0, num): worker = self.workers[0] worker.stayalive = False worker.join(120) del self.workers[0] def _add_worker(self, num=1): self.logger.debug('Adding %s workerthread(s)' % num) for bla in range(0, num): self.threadcounter += 1 worker = Worker("[%s]" % self.threadcounter, self) self.workers.append(worker) worker.start() def shutdown(self, newmanager=None): """ Shutdown manager, transfer queue to a new manager if available. Otherwise mark messages as defer. Keyword Args: newmanager (ProcManager or ThreadPool): has to provide add_task accepting a pickled socket """ # set stayalive to False, this will send # poison pills to the workers self.stayalive = False # now remove elements from the queue # first, put another poison pill for the Threadpool itself self.tasks.put_nowait(None) if newmanager: # new manager available. Transfer tasks # to new manager countmessages = 0 while True: # don't use the get_task_sessionhandler from Threadpool since this will # not give anything once stayalive is False task = self.tasks.get(True) if task is None: # poison pill break newmanager.add_task(task) countmessages += 1 self.logger.info("Moved %u messages to queue of new manager" % countmessages) else: # no new manager. Mark messages as defer. returnMessage = "Temporarily unavailable... Please try again later." markDeferCounter = 0 while True: # don't use the get_task_sessionhandler from Threadpool since this will # not give anything once stayalive is False sesshandler = self.tasks.get(True) if sesshandler == None: # poison pill -> shut down break markDeferCounter += 1 sesshandler.protohandler.defer(returnMessage) self.logger.info("Marked %s messages as '%s' to close queue" % (markDeferCounter,returnMessage)) # remove all the workers (joins them also) for worker in self.workers: worker.stayalive = False worker.join(120) # wait 120 seconds max class Worker(threading.Thread): def __init__(self, workerid, pool): threading.Thread.__init__(self, name='Pool worker %s' % workerid) self.workerid = workerid self.birth = time.time() self.pool = pool self.stayalive = True self.logger = logging.getLogger('%s.threads.worker.%s' % (__package__, workerid)) self.logger.debug('thread init') self.noisy = False self.setDaemon(False) self.workerstate = 'created' def __repr__(self): return "%s: %s" % (self.workerid, self.workerstate) def run(self): self.logger.debug('thread start') while self.stayalive: self.workerstate = 'waiting for task' if self.noisy: self.logger.debug('Getting new task...') sesshandler = self.pool.get_task_sessionhandler() if sesshandler == None: # poison pill -> shut down if self.noisy: self.logger.debug("got a poison pill .. good bye world") self.stayalive = False continue if self.noisy: self.logger.debug('Doing work') try: sesshandler.handlesession(self) except Exception as e: self.logger.error('Unhandled Exception : %s' % e) self.workerstate = 'task completed' self.workerstate = 'ending' self.logger.debug('thread end')