OXIESEC PANEL
- Current Dir:
/
/
opt
/
alt
/
python27
/
lib
/
python2.7
/
site-packages
/
postomaat
/
plugins
Server IP: 2a02:4780:11:1084:0:327f:3464:10
Upload:
Create Dir:
Name
Size
Modified
Perms
📁
..
-
12/18/2024 09:49:00 AM
rwxr-xr-x
📄
__init__.py
578 bytes
03/16/2023 02:20:00 PM
rw-r--r--
📄
__init__.pyc
162 bytes
12/18/2024 09:49:00 AM
rw-r--r--
📄
blackwhitelist.py
10.08 KB
03/16/2023 02:20:00 PM
rw-r--r--
📄
blackwhitelist.pyc
7.95 KB
12/18/2024 09:49:00 AM
rw-r--r--
📄
call-ahead.py
77.45 KB
03/16/2023 02:20:00 PM
rw-r--r--
📄
call-ahead.pyc
67.93 KB
12/18/2024 09:49:00 AM
rw-r--r--
📄
complexrules.py
14.59 KB
03/16/2023 02:20:00 PM
rw-r--r--
📄
complexrules.pyc
17.37 KB
12/18/2024 09:49:00 AM
rw-r--r--
📄
dbwriter.py
5.7 KB
03/16/2023 02:20:00 PM
rw-r--r--
📄
dbwriter.pyc
4.69 KB
12/18/2024 09:49:00 AM
rw-r--r--
📄
ebl-lookup.py
9.03 KB
03/16/2023 02:20:00 PM
rw-r--r--
📄
ebl-lookup.pyc
8.3 KB
12/18/2024 09:49:00 AM
rw-r--r--
📄
enforcetls.py
5.1 KB
03/16/2023 02:20:00 PM
rw-r--r--
📄
enforcetls.pyc
4.39 KB
12/18/2024 09:49:00 AM
rw-r--r--
📄
fluentd_writer.py
6.92 KB
03/16/2023 02:20:00 PM
rw-r--r--
📄
fluentd_writer.pyc
6.73 KB
12/18/2024 09:49:00 AM
rw-r--r--
📄
geoip.py
7.47 KB
03/16/2023 02:20:00 PM
rw-r--r--
📄
geoip.pyc
7.4 KB
12/18/2024 09:49:00 AM
rw-r--r--
📄
helotld.py
3.37 KB
03/16/2023 02:20:00 PM
rw-r--r--
📄
helotld.pyc
3.07 KB
12/18/2024 09:49:00 AM
rw-r--r--
📄
killer.py
1.15 KB
03/16/2023 02:20:00 PM
rw-r--r--
📄
killer.pyc
1.47 KB
12/18/2024 09:49:00 AM
rw-r--r--
📄
messagesize.py
3.86 KB
03/16/2023 02:20:00 PM
rw-r--r--
📄
messagesize.pyc
3.54 KB
12/18/2024 09:49:00 AM
rw-r--r--
📄
originpolicy.py
11.2 KB
03/16/2023 02:20:00 PM
rw-r--r--
📄
originpolicy.pyc
9.51 KB
12/18/2024 09:49:00 AM
rw-r--r--
📁
ratelimit
-
12/18/2024 09:49:00 AM
rwxr-xr-x
📄
rdns.py
4.42 KB
03/16/2023 02:20:00 PM
rw-r--r--
📄
rdns.pyc
4.82 KB
12/18/2024 09:49:00 AM
rw-r--r--
📄
recipientrules.py
11.64 KB
03/16/2023 02:20:00 PM
rw-r--r--
📄
recipientrules.pyc
10.1 KB
12/18/2024 09:49:00 AM
rw-r--r--
📄
script.py
5.55 KB
03/16/2023 02:20:00 PM
rw-r--r--
📄
script.pyc
6 KB
12/18/2024 09:49:00 AM
rw-r--r--
📄
spfcheck.py
15.57 KB
03/16/2023 02:20:00 PM
rw-r--r--
📄
spfcheck.pyc
11.36 KB
12/18/2024 09:49:00 AM
rw-r--r--
📄
srs.py
4.51 KB
03/16/2023 02:20:00 PM
rw-r--r--
📄
srs.pyc
3.85 KB
12/18/2024 09:49:00 AM
rw-r--r--
📄
suspect_collect.py
7.84 KB
03/16/2023 02:20:00 PM
rw-r--r--
📄
suspect_collect.pyc
8.42 KB
12/18/2024 09:49:00 AM
rw-r--r--
Editing: blackwhitelist.py
Close
# -*- coding: UTF-8 -*- # Copyright 2012-2018 Fumail Project # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # # # __version__ = "0.0.1" from postomaat.shared import ScannerPlugin, OK, DUNNO, REJECT, DISCARD, DEFER_IF_PERMIT, strip_address, extract_domain, get_default_cache from postomaat.extensions.sql import SQL_EXTENSION_ENABLED, get_session import fnmatch GLOBALSCOPE = '$GLOBAL' LISTING_TYPES = (dict(name='whitelist_to', cmp=['to_address']), dict(name='more_spam_to', cmp=['to_address']), dict(name='all_spam_to', cmp=['to_address']), dict(name='whitelist_from', cmp=['from_address', 'from_domain']), dict(name='blacklist_to', cmp=['to_address']), dict(name='blacklist_from', cmp=['from_address', 'from_domain']), ) if SQL_EXTENSION_ENABLED: from sqlalchemy import Column from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.types import Unicode, Integer DeclarativeBase = declarative_base() metadata = DeclarativeBase.metadata class UserPref(DeclarativeBase): __tablename__ = 'userpref' prefid = Column(Integer, primary_key=True) username = Column(Unicode(100),nullable=False) preference = Column(Unicode(30),nullable=False) value = Column(Unicode(100),nullable=False) class BlackWhiteList(ScannerPlugin): """ This is a black- and whitelisting plugin reading the spamassassin userpref table from an SQL database. """ def __init__(self,config,section=None): ScannerPlugin.__init__(self,config,section) self.logger=self._logger() self.requiredvars={ 'dbconnection':{ 'default':"mysql://spamassassin@localhost/spamassassin?charset=utf8", 'description':'SQLAlchemy connection string', }, 'usecache':{ 'default':"True", 'description':'Use Mem Cache. This is recommended. However, if enabled it will take up to 5 minutes until a listing gets effective.', }, 'action_whitelist_to':{ 'default':"OK", 'description':'Action for this listing type', }, 'message_whitelist_to':{ 'default':"", 'description':'Message for this listing type', }, 'action_more_spam_to':{ 'default':"OK", 'description':'Action for this listing type', }, 'message_more_spam_to':{ 'default':"", 'description':'Message for this listing type', }, 'action_all_spam_to':{ 'default':"OK", 'description':'Action for this listing type', }, 'message_all_spam_to':{ 'default':"", 'description':'Message for this listing type', }, 'action_whitelist_from':{ 'default':"OK", 'description':'Action for this listing type', }, 'message_whitelist_from':{ 'default':"", 'description':'Message for this listing type', }, 'action_blacklist_to':{ 'default':"REJECT", 'description':'Action for this listing type', }, 'message_blacklist_to':{ 'default':"", 'description':'Blacklisted recipient', }, 'action_blacklist_from':{ 'default':"REJECT", 'description':'Action for this listing type', }, 'message_blacklist_from':{ 'default':"", 'description':'Blacklisted sender', }, } def _get_listings(self): usecache = self.config.getboolean(self.section,'usecache') listings = None cache = get_default_cache() if usecache: listings = cache.get('listings') if not listings: listings = {} try: session = get_session(self.config.get(self.section,'dbconnection')) listing_types = [(l['name']).decode('utf-8') for l in LISTING_TYPES] result = session.query(UserPref).filter(UserPref.preference.in_(listing_types)).all() for r in result: listing_type = r.preference if not listing_type in listings: listings[listing_type] = {} username = r.username if username.startswith('*@'): # roundcube sauserprefs plugin domain wide scope username = username.lstrip('*@') if not username in listings[listing_type]: listings[listing_type][username] = [] listings[listing_type][username].append(r.value) except Exception as e: self.logger.error('Failed to get listings: %s' % str(e)) if listings and usecache: cache.put('listings', listings) return listings def _check_list(self, listtype, listings, user, value): if not listings: return False if not listtype in listings: return False listed = False userlistings = listings[listtype].get(user, []) for l in userlistings: if fnmatch.fnmatch(value, l): listed = True break return listed def _get_action(self, checkname): actionstring = self.config.get(self.section,'action_%s' % checkname).upper() if actionstring == 'OK': action = OK elif actionstring == 'DUNNO': action = DUNNO elif actionstring == 'REJECT': action = REJECT elif actionstring == 'DISCARD': action = DISCARD else: action = None self.logger.warning('Invalid action: %s in option action_%s' % (actionstring, checkname)) return action def _get_message(self, checkname): message = self.config.get(self.section,'message_%s' % checkname).strip() if not message: message = None return message def examine(self,suspect): if not SQL_EXTENSION_ENABLED: return DUNNO from_address=suspect.get_value('sender') if from_address is None: self.logger.warning('No FROM address found') return DEFER_IF_PERMIT,'internal policy error (no from address)' from_address=strip_address(from_address) try: from_domain = extract_domain(from_address) except ValueError as e: self.logger.warning(str(e)) return DUNNO to_address=suspect.get_value('recipient') if to_address is None: self.logger.warning('No RCPT address found') return DEFER_IF_PERMIT,'internal policy error (no rcpt address)' to_address=strip_address(to_address) to_domain=extract_domain(to_address) listings = self._get_listings() result = DUNNO message = None compare = '' found = False for check in LISTING_TYPES: for cmp_value in check['cmp']: if cmp_value == 'to_address': compare = to_address elif cmp_value == 'from_address': compare = from_address elif cmp_value == 'from_domain': compare = from_domain if compare is None: compare = '' for scope in [GLOBALSCOPE, '%%%s' % to_domain, to_address]: if self._check_list(check['name'], listings, scope, compare): result = self._get_action(check['name']) message = self._get_message(check['name']) found = True break if found: break if found: break return result, message def lint(self): status = True if not SQL_EXTENSION_ENABLED: print("sqlalchemy is not installed") status = False try: session = get_session(self.config.get(self.section,'dbconnection')) try: session.query(UserPref).first() except Exception as e: print("Table or field configuration error: %s"%str(e)) status = False except Exception as e: print("DB Connection failed. Reason: %s"%(str(e))) status = False if status: listings = self._get_listings() count = 0 for listingtype in listings: for user in listings[listingtype]: count += len(listings[listingtype][user]) print("found %s listings" % count) for check in LISTING_TYPES: if self._get_action(check['name']) is None: print('Invalid action %s for action_%s' % (self.config.get(self.section,'action_%s' % check['name']), check['name'])) status = False return status def __str__(self): return "BlackWhiteList"