OXIESEC PANEL
- Current Dir:
/
/
opt
/
alt
/
python27
/
lib
/
python2.7
/
site-packages
/
postomaat
/
plugins
Server IP: 2a02:4780:11:1084:0:327f:3464:10
Upload:
Create Dir:
Name
Size
Modified
Perms
📁
..
-
12/18/2024 09:49:00 AM
rwxr-xr-x
📄
__init__.py
578 bytes
03/16/2023 02:20:00 PM
rw-r--r--
📄
__init__.pyc
162 bytes
12/18/2024 09:49:00 AM
rw-r--r--
📄
blackwhitelist.py
10.08 KB
03/16/2023 02:20:00 PM
rw-r--r--
📄
blackwhitelist.pyc
7.95 KB
12/18/2024 09:49:00 AM
rw-r--r--
📄
call-ahead.py
77.45 KB
03/16/2023 02:20:00 PM
rw-r--r--
📄
call-ahead.pyc
67.93 KB
12/18/2024 09:49:00 AM
rw-r--r--
📄
complexrules.py
14.59 KB
03/16/2023 02:20:00 PM
rw-r--r--
📄
complexrules.pyc
17.37 KB
12/18/2024 09:49:00 AM
rw-r--r--
📄
dbwriter.py
5.7 KB
03/16/2023 02:20:00 PM
rw-r--r--
📄
dbwriter.pyc
4.69 KB
12/18/2024 09:49:00 AM
rw-r--r--
📄
ebl-lookup.py
9.03 KB
03/16/2023 02:20:00 PM
rw-r--r--
📄
ebl-lookup.pyc
8.3 KB
12/18/2024 09:49:00 AM
rw-r--r--
📄
enforcetls.py
5.1 KB
03/16/2023 02:20:00 PM
rw-r--r--
📄
enforcetls.pyc
4.39 KB
12/18/2024 09:49:00 AM
rw-r--r--
📄
fluentd_writer.py
6.92 KB
03/16/2023 02:20:00 PM
rw-r--r--
📄
fluentd_writer.pyc
6.73 KB
12/18/2024 09:49:00 AM
rw-r--r--
📄
geoip.py
7.47 KB
03/16/2023 02:20:00 PM
rw-r--r--
📄
geoip.pyc
7.4 KB
12/18/2024 09:49:00 AM
rw-r--r--
📄
helotld.py
3.37 KB
03/16/2023 02:20:00 PM
rw-r--r--
📄
helotld.pyc
3.07 KB
12/18/2024 09:49:00 AM
rw-r--r--
📄
killer.py
1.15 KB
03/16/2023 02:20:00 PM
rw-r--r--
📄
killer.pyc
1.47 KB
12/18/2024 09:49:00 AM
rw-r--r--
📄
messagesize.py
3.86 KB
03/16/2023 02:20:00 PM
rw-r--r--
📄
messagesize.pyc
3.54 KB
12/18/2024 09:49:00 AM
rw-r--r--
📄
originpolicy.py
11.2 KB
03/16/2023 02:20:00 PM
rw-r--r--
📄
originpolicy.pyc
9.51 KB
12/18/2024 09:49:00 AM
rw-r--r--
📁
ratelimit
-
12/18/2024 09:49:00 AM
rwxr-xr-x
📄
rdns.py
4.42 KB
03/16/2023 02:20:00 PM
rw-r--r--
📄
rdns.pyc
4.82 KB
12/18/2024 09:49:00 AM
rw-r--r--
📄
recipientrules.py
11.64 KB
03/16/2023 02:20:00 PM
rw-r--r--
📄
recipientrules.pyc
10.1 KB
12/18/2024 09:49:00 AM
rw-r--r--
📄
script.py
5.55 KB
03/16/2023 02:20:00 PM
rw-r--r--
📄
script.pyc
6 KB
12/18/2024 09:49:00 AM
rw-r--r--
📄
spfcheck.py
15.57 KB
03/16/2023 02:20:00 PM
rw-r--r--
📄
spfcheck.pyc
11.36 KB
12/18/2024 09:49:00 AM
rw-r--r--
📄
srs.py
4.51 KB
03/16/2023 02:20:00 PM
rw-r--r--
📄
srs.pyc
3.85 KB
12/18/2024 09:49:00 AM
rw-r--r--
📄
suspect_collect.py
7.84 KB
03/16/2023 02:20:00 PM
rw-r--r--
📄
suspect_collect.pyc
8.42 KB
12/18/2024 09:49:00 AM
rw-r--r--
Editing: spfcheck.py
Close
# -*- coding: UTF-8 -*- # Copyright 2012-2018 Fumail Project # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # # # from postomaat.shared import ScannerPlugin, DUNNO, strip_address, extract_domain, apply_template, \ FileList, string_to_actioncode, get_default_cache, Cache from postomaat.extensions.sql import SQL_EXTENSION_ENABLED, get_session, get_domain_setting import os import fnmatch try: import spf HAVE_SPF = True except ImportError: spf = None HAVE_SPF = False try: from netaddr import IPAddress, IPNetwork HAVE_NETADDR = True except ImportError: IPAddress = IPNetwork = None HAVE_NETADDR = False class SPFPlugin(ScannerPlugin): """This plugin performs SPF validation using the pyspf module https://pypi.python.org/pypi/pyspf/ by default, it just logs the result (test mode) to enable actual rejection of messages, add a config option on_<resulttype> with a valid postfix action. eg: on_fail = REJECT valid result types are: 'pass', 'permerror', 'fail', 'temperror', 'softfail', 'none', and 'neutral' you probably want to define REJECT for fail and softfail operation mode examples ----------------------- I want to reject all hard fails and accept all soft fails: - do not set domain_selective_spf_file - set selective_softfail to False - set on_fail to REJECT and on_softfail to DUNNO I want to reject all hard fails and all soft fails: - do not set domain selective_spf_file - set selective_softfail to False - set on_fail to REJECT and on_softfail to REJECT I only want to reject select hard and soft fails - set a domain_selective_spf_file and list the domains to be tested - set selective_softfail to False - set on_fail to REJECT and on_softfail to REJECT I want to reject all hard fails and only selected soft fails: - set a domain_selective_spf_file and list the domains to be tested for soft fail - set selective_softfail to True - set on_fail to REJECT and on_softfail to REJECT I want to reject select hard fails and accept all soft fails: - do not set domain selective_spf_file - set selective_softfail to True - set on_fail to REJECT and on_softfail to DUNNO """ def __init__(self,config,section=None): ScannerPlugin.__init__(self,config,section) self.logger=self._logger() self.check_cache = Cache() self.requiredvars={ 'ip_whitelist_file':{ 'default':'', 'description':'file containing a list of ip adresses to be exempted from SPF checks. Supports CIDR notation if the netaddr module is installed. 127.0.0.0/8 is always exempted', }, 'domain_whitelist_file':{ 'default':'', 'description':'if this is non-empty, all except sender domains in this file will be checked for SPF', }, 'domain_selective_spf_file':{ 'default':'', 'description':'if this is non-empty, only sender domains in this file will be checked for SPF', }, 'selective_softfail':{ 'default':'0', 'description':'evaluate all senders for hard fails (unless listed in domain_whitelist_file) and only evaluate softfail for domains listed in domain_selective_spf_file', }, 'check_subdomain':{ 'default':'0', 'description':'apply checks to subdomain of whitelisted/selective domains', }, 'dbconnection':{ 'default':"mysql://root@localhost/spfcheck?charset=utf8", 'description':'SQLAlchemy Connection string. Leave empty to disable SQL lookups', }, 'domain_sql_query':{ 'default':"SELECT check_spf from domain where domain_name=:domain", 'description':'get from sql database :domain will be replaced with the actual domain name. must return field check_spf', }, 'on_fail':{ 'default':'DUNNO', 'description':'Action for SPF fail.', }, 'on_softfail':{ 'default':'DUNNO', 'description':'Action for SPF softfail.', }, 'messagetemplate':{ 'default':'SPF ${result} for domain ${from_domain} from ${client_address} : ${explanation}' }, } if HAVE_NETADDR: self.private_nets = [ IPNetwork('10.0.0.0/8'), # private network IPNetwork('127.0.0.0/8'), # localhost IPNetwork('169.254.0.0/16'), # link local IPNetwork('172.16.0.0/12'), # private network IPNetwork('192.168.0.0/16'), # private network IPNetwork('fe80::/10'), # ipv6 link local IPNetwork('::1/128'), # localhost ] else: self.private_nets = None self.ip_whitelist_loader=None self.ip_whitelist=[] # either a list of plain ip adress strings or a list of IPNetwork if netaddr is available self.selective_domain_loader=None self.domain_whitelist_loader=None def _domain_in_list(self, domain, domain_list, check_subdomain): listed = False for item in domain_list: if item == domain: listed = True break if check_subdomain and domain.endswith('.%s' % item): listed = True break if item.endswith('.*') and fnmatch.fnmatch(domain, item): listed = True break if check_subdomain and item.endswith('.*') and fnmatch.fnmatch(domain, '*.%s' % item): listed = True break return listed def check_this_domain(self, from_domain): do_check = self.check_cache.get_cache(from_domain) if do_check is not None: return do_check do_check = None check_subdomain = self.config.getboolean(self.section,'check_subdomain') domain_whitelist_file = self.config.get(self.section,'domain_whitelist_file').strip() if domain_whitelist_file != '' and os.path.exists(domain_whitelist_file): if self.domain_whitelist_loader is None: self.domain_whitelist_loader = FileList(domain_whitelist_file, lowercase=True) if not self._domain_in_list(from_domain, self.domain_whitelist_loader.get_list(), check_subdomain): do_check = False if do_check is None: selective_sender_domain_file = self.config.get(self.section,'domain_selective_spf_file').strip() if selective_sender_domain_file != '' and os.path.exists(selective_sender_domain_file): if self.selective_domain_loader is None: self.selective_domain_loader = FileList(selective_sender_domain_file, lowercase=True) if self._domain_in_list(from_domain, self.selective_domain_loader.get_list(), check_subdomain): do_check = True if do_check is None: dbconnection = self.config.get(self.section, 'dbconnection').strip() sqlquery = self.config.get(self.section, 'domain_sql_query') if dbconnection!='' and SQL_EXTENSION_ENABLED: cache = get_default_cache() if get_domain_setting(from_domain, dbconnection, sqlquery, cache, self.section, False, self.logger): do_check = True elif dbconnection!='' and not SQL_EXTENSION_ENABLED: self.logger.error('dbconnection specified but sqlalchemy not available - skipping db lookup') if do_check is None: do_check = False self.check_cache.put_cache(from_domain, do_check) return do_check def is_private_address(self,addr): if HAVE_NETADDR: ipaddr = IPAddress(addr) private = False for net in self.private_nets: if ipaddr in net: private = True break return private else: if addr=='127.0.0.1' or addr=='::1' or addr.startswith('10.') or addr.startswith('192.168.') or addr.startswith('fe80:'): return True if not addr.startswith('172.'): return False for i in range(16,32): if addr.startswith('172.%s'%i): return True return False def ip_whitelisted(self,addr): if self.is_private_address(addr): return True #check ip whitelist try: ip_whitelist_file = self.config.get(self.section, 'ip_whitelist_file').strip() except Exception: ip_whitelist_file = '' if ip_whitelist_file != '' and os.path.exists(ip_whitelist_file): plainlist = [] if self.ip_whitelist_loader is None: self.ip_whitelist_loader=FileList(ip_whitelist_file,lowercase=True) if self.ip_whitelist_loader.file_changed(): plainlist=self.ip_whitelist_loader.get_list() if HAVE_NETADDR: self.ip_whitelist=[IPNetwork(x) for x in plainlist] else: self.ip_whitelist=plainlist if HAVE_NETADDR: checkaddr=IPAddress(addr) for net in self.ip_whitelist: if checkaddr in net: return True else: if addr in plainlist: return True return False def examine(self,suspect): if not HAVE_SPF: return DUNNO client_address=suspect.get_value('client_address') helo_name=suspect.get_value('helo_name') sender=suspect.get_value('sender') if client_address is None or helo_name is None or sender is None: self.logger.error('missing client_address or helo or sender') return DUNNO if self.ip_whitelisted(client_address): self.logger.info("Client %s is whitelisted - no SPF check" % client_address) return DUNNO sender_email = strip_address(sender) if sender_email=='' or sender_email is None: return DUNNO sender_domain = extract_domain(sender_email) if sender_domain is None: self.logger.error('no domain found in sender address %s' % sender_email) return DUNNO sender_domain = sender_domain.lower() check_domain = self.check_this_domain(sender_domain) selective_softfail = self.config.getboolean(self.section, 'selective_softfail') if not check_domain and not selective_softfail: #selective_softfail is False: check all and filter later self.logger.debug('skipping SPF check for %s' % sender_domain) return DUNNO result, explanation = spf.check2(client_address, sender_email, helo_name) suspect.tags['spf'] = result if result != 'none': self.logger.info('SPF client=%s, sender=%s, h=%s result=%s : %s' % (client_address, sender_email, helo_name, result, explanation)) action = DUNNO message = apply_template(self.config.get(self.section, 'messagetemplate'), suspect, dict(result=result, explanation=explanation)) if result == 'fail' and (check_domain or selective_softfail): # reject on hard fail if domain is listed in domain_selective_spf_file or selective_softfail is enabled action = string_to_actioncode(self.config.get(self.section, 'on_fail')) elif result == 'softfail' and check_domain: # reject on soft fail if domain is listed in domain_selective_spf_file action = string_to_actioncode(self.config.get(self.section, 'on_softfail')) elif result == 'softfail' and not check_domain: # only log soft fail if domain is not listed in domain_selective_spf_file self.logger.info('ignoring SPF check for %s evaluating to softfail') elif result not in ['fail', 'softfail']: # custom action for none, neutral, pass if self.config.has_option(self.section, 'on_%s' % result): action = string_to_actioncode(self.config.get(self.section, 'on_%s' % result)) return action, message def lint(self): lint_ok = True if not HAVE_SPF: print('pyspf or pydns module not installed - this plugin will do nothing') lint_ok = False if not HAVE_NETADDR: print('WARNING: netaddr python module not installed - IP whitelist will not support CIDR notation') if not self.checkConfig(): print('Error checking config') lint_ok = False domain_whitelist_file = self.config.get(self.section,'domain_whitelist_file').strip() if domain_whitelist_file != '' and not os.path.exists(domain_whitelist_file): print("domain_whitelist_file %s does not exist" % domain_whitelist_file) lint_ok = False selective_sender_domain_file=self.config.get(self.section,'domain_selective_spf_file').strip() if selective_sender_domain_file != '' and not os.path.exists(selective_sender_domain_file): print("domain_selective_spf_file %s does not exist" % selective_sender_domain_file) lint_ok = False if domain_whitelist_file and selective_sender_domain_file: print('WARNING: domain_whitelist_file and domain_selective_spf_file specified - whitelist has precedence, will check all domains and ignore domain_selective_spf_file') ip_whitelist_file=self.config.get(self.section,'ip_whitelist_file').strip() if ip_whitelist_file != '' and os.path.exists(ip_whitelist_file): print("ip_whitelist_file %s does not exist - IP whitelist is disabled" % ip_whitelist_file) lint_ok = False sqlquery = self.config.get(self.section, 'domain_sql_query') dbconnection = self.config.get(self.section, 'dbconnection').strip() if not SQL_EXTENSION_ENABLED and dbconnection != '': print('SQLAlchemy not available, cannot use SQL backend') lint_ok = False elif dbconnection == '': print('No DB connection defined. Disabling SQL backend') else: if not sqlquery.lower().startswith('select '): lint_ok = False print('SQL statement must be a SELECT query') if lint_ok: try: conn=get_session(dbconnection) conn.execute(sqlquery, {'domain':'example.com'}) except Exception as e: lint_ok = False print(str(e)) return lint_ok def __str__(self): return "SPF"