OXIESEC PANEL
- Current Dir:
/
/
opt
/
alt
/
python27
/
lib
/
python2.7
/
site-packages
/
postomaat
/
plugins
Server IP: 2a02:4780:11:1084:0:327f:3464:10
Upload:
Create Dir:
Name
Size
Modified
Perms
📁
..
-
12/18/2024 09:49:00 AM
rwxr-xr-x
📄
__init__.py
578 bytes
03/16/2023 02:20:00 PM
rw-r--r--
📄
__init__.pyc
162 bytes
12/18/2024 09:49:00 AM
rw-r--r--
📄
blackwhitelist.py
10.08 KB
03/16/2023 02:20:00 PM
rw-r--r--
📄
blackwhitelist.pyc
7.95 KB
12/18/2024 09:49:00 AM
rw-r--r--
📄
call-ahead.py
77.45 KB
03/16/2023 02:20:00 PM
rw-r--r--
📄
call-ahead.pyc
67.93 KB
12/18/2024 09:49:00 AM
rw-r--r--
📄
complexrules.py
14.59 KB
03/16/2023 02:20:00 PM
rw-r--r--
📄
complexrules.pyc
17.37 KB
12/18/2024 09:49:00 AM
rw-r--r--
📄
dbwriter.py
5.7 KB
03/16/2023 02:20:00 PM
rw-r--r--
📄
dbwriter.pyc
4.69 KB
12/18/2024 09:49:00 AM
rw-r--r--
📄
ebl-lookup.py
9.03 KB
03/16/2023 02:20:00 PM
rw-r--r--
📄
ebl-lookup.pyc
8.3 KB
12/18/2024 09:49:00 AM
rw-r--r--
📄
enforcetls.py
5.1 KB
03/16/2023 02:20:00 PM
rw-r--r--
📄
enforcetls.pyc
4.39 KB
12/18/2024 09:49:00 AM
rw-r--r--
📄
fluentd_writer.py
6.92 KB
03/16/2023 02:20:00 PM
rw-r--r--
📄
fluentd_writer.pyc
6.73 KB
12/18/2024 09:49:00 AM
rw-r--r--
📄
geoip.py
7.47 KB
03/16/2023 02:20:00 PM
rw-r--r--
📄
geoip.pyc
7.4 KB
12/18/2024 09:49:00 AM
rw-r--r--
📄
helotld.py
3.37 KB
03/16/2023 02:20:00 PM
rw-r--r--
📄
helotld.pyc
3.07 KB
12/18/2024 09:49:00 AM
rw-r--r--
📄
killer.py
1.15 KB
03/16/2023 02:20:00 PM
rw-r--r--
📄
killer.pyc
1.47 KB
12/18/2024 09:49:00 AM
rw-r--r--
📄
messagesize.py
3.86 KB
03/16/2023 02:20:00 PM
rw-r--r--
📄
messagesize.pyc
3.54 KB
12/18/2024 09:49:00 AM
rw-r--r--
📄
originpolicy.py
11.2 KB
03/16/2023 02:20:00 PM
rw-r--r--
📄
originpolicy.pyc
9.51 KB
12/18/2024 09:49:00 AM
rw-r--r--
📁
ratelimit
-
12/18/2024 09:49:00 AM
rwxr-xr-x
📄
rdns.py
4.42 KB
03/16/2023 02:20:00 PM
rw-r--r--
📄
rdns.pyc
4.82 KB
12/18/2024 09:49:00 AM
rw-r--r--
📄
recipientrules.py
11.64 KB
03/16/2023 02:20:00 PM
rw-r--r--
📄
recipientrules.pyc
10.1 KB
12/18/2024 09:49:00 AM
rw-r--r--
📄
script.py
5.55 KB
03/16/2023 02:20:00 PM
rw-r--r--
📄
script.pyc
6 KB
12/18/2024 09:49:00 AM
rw-r--r--
📄
spfcheck.py
15.57 KB
03/16/2023 02:20:00 PM
rw-r--r--
📄
spfcheck.pyc
11.36 KB
12/18/2024 09:49:00 AM
rw-r--r--
📄
srs.py
4.51 KB
03/16/2023 02:20:00 PM
rw-r--r--
📄
srs.pyc
3.85 KB
12/18/2024 09:49:00 AM
rw-r--r--
📄
suspect_collect.py
7.84 KB
03/16/2023 02:20:00 PM
rw-r--r--
📄
suspect_collect.pyc
8.42 KB
12/18/2024 09:49:00 AM
rw-r--r--
Editing: originpolicy.py
Close
# -*- coding: UTF-8 -*- # Copyright 2012-2018 Fumail Project # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # # # """ This plugin allows to configure from which hosts you are willing to accept mail for a given domain. Check by recipient domain (MX Rules): This can be useful if you provide shared hosting (= many domains on one mail server) and some of the domains use a cloud based spam filter (= MX records not pointing directly to your hosting server). You can reject mail coming from unexpected hosts trying to bypass the spam filter. Check by sender domain (SPF Rules): Some domains/freemailers do not have an SPF record, although their domains are frequently forged and abused as spam sender. This plugin allows you to build your own fake SPF database. Check forward block (FWD Rules): Some users forward abusive amounts of unproperly filtered mail. This mail is hard to filter as it's delivered through an additional relay, leading to unnecessary high amounts of false negatives. To protect recipients and spam filter reputation such mail can be blocked. """ __version__ = "0.0.5" import os import re try: from netaddr import IPAddress, IPNetwork HAVE_NETADDR = True except ImportError: IPAddress = IPNetwork = None HAVE_NETADDR = False from postomaat.shared import ScannerPlugin, DUNNO, DEFER, REJECT, strip_address, extract_domain, FileList, apply_template class RulesCache(FileList): def __init__(self, filename=None, strip=True, skip_empty=True, skip_comments=True, lowercase=False, additional_filters=None, minimum_time_between_reloads=5): self.addresses = {} self.names = {} FileList.__init__(self, filename, strip, skip_empty, skip_comments, lowercase, additional_filters, minimum_time_between_reloads) def _reload(self): regex_ip = '^(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}(/\d{1,2})?|[a-f0-9:]{3,39})$' with open(self.filename) as fp: lines = fp.readlines() for line in lines: line.strip() if line and not line.startswith('#'): data = line.split(None, 1) if len(data) != 2: continue domain = data[0] nets = data[1] for item in nets.split(','): item = item.strip().lower() if re.match(regex_ip, item): if not domain in self.addresses: self.addresses[domain] = [] item = IPNetwork(item) if not item in self.addresses[domain]: self.addresses[domain].append(item) else: if not domain in self.names: self.names[domain] = [] if not item in self.names[domain]: self.names[domain].append(item) def _permitted_ip(self, domain, ip): if domain not in self.addresses: return True perm = False for net in self.addresses[domain]: if IPAddress(ip) in net: perm = True break return perm def _permitted_name(self, domain, hostname): if domain not in self.names: return True perm = False for name in self.names[domain]: if hostname == name or hostname.endswith('.%s' % name): perm = True break return perm def permitted(self, domain, ip, hostname, default=True): self._reload_if_necessary() #domain is not listed, we accept mail from everywhere if not domain in self.addresses and not domain in self.names: return default ip_perm = self._permitted_ip(domain, ip) name_perm = self._permitted_name(domain, hostname) return ip_perm and name_perm class OriginPolicy(ScannerPlugin): def __init__(self,config,section=None): ScannerPlugin.__init__(self,config,section) self.logger=self._logger() self.mxrules = None self.spfrules = None self.fwdrules = None self.requiredvars={ 'datafile_mx':{ 'default':'/etc/postomaat/conf.d/enforcemx.txt', 'description':'recipient domain based rule file', }, 'messagetemplate_mx':{ 'default':'We do not accept mail for ${to_address} from ${reverse_client_address}. Please send to MX records!' }, 'datafile_spf':{ 'default':'/etc/postomaat/conf.d/fakespf.txt', 'description':'sender domain based rule file', }, 'messagetemplate_spf':{ 'default':'We do not accept mail for ${from_domain} from ${client_address} with name ${reverse_client_name}. Please use the official mail servers!' }, 'datafile_fwd': { 'default': '/etc/postomaat/conf.d/forwardblock.txt', 'description': 'sender domain based rule file', }, 'messagetemplate_fwd': { 'default': 'We do not accept forwarded mail for ${to_address} from ${reverse_client_name}.' }, } def examine(self,suspect): if not HAVE_NETADDR: return DUNNO,None client_address=suspect.get_value('client_address') if client_address is None: self.logger.error('No client address found - skipping') return DUNNO client_name=suspect.get_value('client_name') if client_name is None: client_name = 'unknown' action, message = self._examine_mx(suspect, client_address, client_name) if action == DUNNO: action, message = self._examine_spf(suspect, client_address, client_name) if action == DUNNO: action, message = self._examine_fwd(suspect, client_address, client_name) return action, message def _examine_mx(self, suspect, client_address, client_name): to_address=suspect.get_value('recipient') if to_address is None: self.logger.warning('No RCPT address found') return DEFER,'internal policy error (no rcpt address)' to_address=strip_address(to_address).lower() to_domain=extract_domain(to_address) if not self.mxrules: datafile = self.config.get(self.section,'datafile_mx') if os.path.exists(datafile): self.mxrules = RulesCache(datafile) else: return DUNNO,None action = DUNNO message = None if not self.mxrules.permitted(to_domain, client_address, client_name): if client_name == 'unknown': action = DEFER else: action = REJECT message = apply_template(self.config.get(self.section, 'messagetemplate_mx'), suspect) return action, message def _examine_spf(self, suspect, client_address, client_name): from_address=suspect.get_value('sender') if from_address is None: self.logger.warning('No FROM address found') return DEFER,'internal policy error (no from address)' from_address=strip_address(from_address) from_domain=extract_domain(from_address) if not self.spfrules: datafile = self.config.get(self.section, 'datafile_spf') if os.path.exists(datafile): self.spfrules = RulesCache(datafile) else: return DUNNO,None action = DUNNO message = None if not self.spfrules.permitted(from_domain, client_address, client_name): if client_name == 'unknown': action = DEFER else: action = REJECT message = apply_template(self.config.get(self.section, 'messagetemplate_spf'), suspect) return action, message def _examine_fwd(self, suspect, client_address, client_name): to_address = suspect.get_value('recipient') if to_address is None: self.logger.warning('No RCPT address found') return DEFER, 'internal policy error (no rcpt address)' to_address = strip_address(to_address) to_domain = extract_domain(to_address) if not self.fwdrules: datafile = self.config.get(self.section, 'datafile_fwd') if os.path.exists(datafile): self.fwdrules = RulesCache(datafile) else: return DUNNO, None action = DUNNO message = None if self.fwdrules.permitted(to_domain, client_address, client_name, default=False) \ or self.fwdrules.permitted(to_address, client_address, client_name, default=False): if suspect.get_value('reverse_client_name') == 'unknown': action = DEFER else: action = REJECT message = apply_template(self.config.get(self.section, 'messagetemplate_fwd'), suspect) return action, message def lint(self): lint_ok = True if not HAVE_NETADDR: print('netaddr python module not available - please install') lint_ok = False if not self.checkConfig(): print('Error checking config') lint_ok = False check_mx = True datafile = self.config.get(self.section, 'datafile_mx') if datafile and not os.path.exists(datafile): print('MX datafile not found - this plugin will not enforce MX usage') check_mx = False check_spf = True datafile = self.config.get(self.section, 'datafile_spf') if datafile and not os.path.exists(datafile): print('SPF datafile not found - this plugin will not check fake SPF') check_spf = False check_fwd = True datafile = self.config.get(self.section, 'datafile_fwd') if datafile and not os.path.exists(datafile): print('Forward block datafile not found - this plugin will not check forwards') check_fwd = False if not (check_mx or check_spf or check_fwd): lint_ok = False return lint_ok def __str__(self): return self.__class__.__name__