OXIESEC PANEL
- Current Dir:
/
/
opt
/
alt
/
python27
/
lib
/
python2.7
/
site-packages
/
postomaat
/
plugins
/
ratelimit
Server IP: 2a02:4780:11:1084:0:327f:3464:10
Upload:
Create Dir:
Name
Size
Modified
Perms
📁
..
-
12/18/2024 09:49:00 AM
rwxr-xr-x
📄
__init__.py
91 bytes
03/16/2023 02:20:00 PM
rw-r--r--
📄
__init__.pyc
288 bytes
12/18/2024 09:49:00 AM
rw-r--r--
📄
main.py
7.69 KB
03/16/2023 02:20:00 PM
rw-r--r--
📄
main.pyc
7.01 KB
12/18/2024 09:49:00 AM
rw-r--r--
📁
strategies
-
12/18/2024 09:49:00 AM
rwxr-xr-x
Editing: main.py
Close
# -*- coding: UTF-8 -*- import re import os import sys from collections import defaultdict from postomaat.shared import ScannerPlugin, DUNNO, string_to_actioncode, apply_template from postomaat.plugins.ratelimit.strategies import AVAILABLE_STRATEGIES, AVAILABLE_BACKENDS if sys.version_info > (3,): unicode = str #pylint: disable=C0103 class Limiter(object): def __init__(self): self.name = None self.strategy = None self.max = -1 # negative value: no limit self.timespan = 1 self.fields = [] self.regex = None self.skip = None self.action = DUNNO self.message = 'Limit exceeded' def __str__(self): return "<Limiter name={name:s} rate={limit:d}/{timespan:d} fields={fields:s}>".format( name=self.name, limit=self.max, timespan=self.timespan, fields=",".join(self.fields) ) class RateLimitPlugin(ScannerPlugin): def __init__(self, config, section=None): ScannerPlugin.__init__(self, config, section) self.requiredvars = { 'limiterfile': { 'default': '/etc/postomaat/conf.d/ratelimit.conf', 'description': 'file based rate limits', }, 'backendtype':{ 'default': 'memory', 'description': 'type of backend where the events are stored. memory is only recommended for low traffic standalone systems. alternatives are: redis, sqlalchemy' #pylint: disable=C0301 }, 'backendconfig':{ 'default': '', 'description': 'backend specific configuration. sqlalchemy: the database url, redis: redis://:[password]@hostname:6379/0' #pylint: disable=C0301 } } self.logger = self._logger() self.backends = dict() self.reqired_strategies = list() self.limiters = None self.logger.info("RateLimit plugin available backends: %s", ' '.join([str(k) + " => " + str(AVAILABLE_BACKENDS[k].keys()) for k in AVAILABLE_BACKENDS.keys()])) def load_limiter_config(self, text): patt = re.compile(r'^limit\s+name=(?P<name>[^\s]+)\s+strategy=(?P<strategy>[^\s]+)\s+rate=(?P<max>\-?\d{1,10})\/(?P<time>\d{1,10})\s+fields=(?P<fieldlist>[^\s]+)(\s+match=\/(?P<matchregex>.+)\/(\s+skip=(?P<skiplist>[^\s]+))?)?\s+action=(?P<action>[^\s]+)\s+message=(?P<message>.*)$') #pylint: disable=C0301 limiters = [] lineno = 0 for line in text.split('\n'): lineno += 1 line = line.strip() if line.startswith('#') or line.strip() == '': continue match = patt.match(line) if match is None: self.logger.error('cannot parse limiter config line %d', lineno) continue gdict = match.groupdict() limiter = Limiter() limiter.name = gdict['name'] limiter.strategy = gdict['strategy'] limiter.max = int(gdict['max']) limiter.timespan = int(gdict['time']) limiter.fields = gdict['fieldlist'].split(',') limiter.regex = gdict['matchregex'] if gdict['skiplist'] is not None: limiter.skip = gdict['skiplist'].split(',') action = string_to_actioncode(gdict['action']) if action is None: self.logger.error('Limiter config line %d: invalid action %s', lineno, gdict['action']) limiter.action = action limiter.message = gdict['message'] if limiter.strategy not in AVAILABLE_STRATEGIES: self.logger.error('Limiter config line %d: invalid strategy %s', lineno, gdict['strategy']) else: self.reqired_strategies.append(limiter.strategy) limiters.append(limiter) return limiters def load_backends(self): """ Of all the AVAILABLE_BACKENDS load only the backends required by limiters """ for strategy in self.reqired_strategies: btype = self.config.get(self.section, 'backendtype') if strategy in self.backends: return if btype not in AVAILABLE_BACKENDS[strategy]: self.logger.error('RateLimit backend %s not available for strategy %s', btype, strategy) return backendconfig = self.config.get(self.section, 'backendconfig') try: backend_instance = AVAILABLE_BACKENDS[strategy][btype](backendconfig) self.backends.update( { strategy: backend_instance } ) except Exception as ex: error = type(ex).__name__, ex.message self.logger.error('Failed to load backend %s.%s error %s', strategy, btype, error) def examine(self, suspect): if self.limiters is None: filename = self.config.get(self.section, 'limiterfile') if not os.path.exists(filename): self.logger.error('Limiter config file %s not found', filename) return with open(filename) as filehandle: limiterconfig = filehandle.read() limiters = self.load_limiter_config(limiterconfig) self.limiters = limiters self.logger.info('Found %d limiter configurations', len(limiters)) self.load_backends() skiplist = [] for limiter in self.limiters: if limiter.name in skiplist: # check if this limiter is skipped by a previous one self.logger.debug('limiter %s skipped due to previous match', limiter.name) continue #get field values allfieldsavailable = True fieldvalues = [] for fieldname in limiter.fields: if hasattr(suspect, fieldname): fieldvalues.append(getattr(suspect, fieldname)) elif suspect.get_tag(fieldname): fieldvalues.append(str(suspect.get_tag(fieldname))) else: allfieldsavailable = False self.logger.debug('Skipping limiter %s - suspect field or tag %s not available', limiter.name, fieldname) break if not allfieldsavailable: #rate limit can not be applied continue checkval = ','.join(fieldvalues) if limiter.regex is not None: if re.match(limiter.regex, checkval): if limiter.skip is not None: skiplist.extend(limiter.skip) else: #no match, skip this limiter self.logger.debug('Skipping limiter %s - regex does not match', limiter.name) continue eventname = limiter.name+checkval timespan = limiter.timespan if limiter.max < 0: #no limit continue try: (allow, count) = self.backends[limiter.strategy].check_allowed(eventname, limiter.max, timespan) self.logger.debug('Limiter event %s count: %d', eventname, count) if not allow: return limiter.action, apply_template(limiter.message, suspect) except Exception as ex: error = type(ex).__name__, ex.message self.logger.error('Failed to run limitter backend for strategy "%s" eventname %s error %s', limiter.strategy, eventname, error)