From fbe1acf220b69e244776105390b1abf5b77a96e8 Mon Sep 17 00:00:00 2001 From: jesopo Date: Thu, 12 Dec 2019 14:13:17 +0000 Subject: refactor dnsbl module, show reason for positive detection when possible --- modules/dnsbl.py | 50 -------------------------------------------- modules/dnsbl/__init__.py | 53 +++++++++++++++++++++++++++++++++++++++++++++++ modules/dnsbl/lists.py | 40 +++++++++++++++++++++++++++++++++++ 3 files changed, 93 insertions(+), 50 deletions(-) delete mode 100644 modules/dnsbl.py create mode 100644 modules/dnsbl/__init__.py create mode 100644 modules/dnsbl/lists.py (limited to 'modules') diff --git a/modules/dnsbl.py b/modules/dnsbl.py deleted file mode 100644 index 154d73cc..00000000 --- a/modules/dnsbl.py +++ /dev/null @@ -1,50 +0,0 @@ -import ipaddress -from src import ModuleManager, utils -import dns.resolver - -DEFAULT_LISTS = [ - "rbl.efnetrbl.org", - "zen.spamhaus.org" -] - -class Module(ModuleManager.BaseModule): - @utils.hook("received.command.dnsbl") - def dnsbl(self, event): - args = event["args_split"] - - lists = [] - for i, arg in reversed(list(enumerate(args))): - if arg[0] == "@": - lists.insert(0, args.pop(i)) - lists = lists or DEFAULT_LISTS - - address = args[0] - failed = self._check_lists(lists, address) - if failed: - event["stderr"].write("%s failed for lists: %s" % - (address, ", ".join(failed))) - else: - event["stdout"].write("%s not found in blacklists" % address) - - def _check_lists(self, lists, address): - address_obj = ipaddress.ip_address(address) - - if address_obj.version == 6: - address = reversed(address_obj.exploded.replace(":", "")) - else: - address = reversed(address.split(".")) - address = ".".join(address) - - failed = [] - for list in lists: - if not self._check_list(list, address): - failed.append(list) - return failed - - def _check_list(self, list, address): - list_address = "%s.%s" % (address, list) - try: - dns.resolver.query(list_address) - except dns.resolver.NXDOMAIN: - return True - return False diff --git a/modules/dnsbl/__init__.py b/modules/dnsbl/__init__.py new file mode 100644 index 00000000..2c3fa36d --- /dev/null +++ b/modules/dnsbl/__init__.py @@ -0,0 +1,53 @@ +import ipaddress +from src import ModuleManager, utils +import dns.resolver +from . import lists as _lists + +class Module(ModuleManager.BaseModule): + @utils.hook("received.command.dnsbl") + def dnsbl(self, event): + args = event["args_split"] + + default_lists = _lists.default_lists() + lists = [] + for i, arg in reversed(list(enumerate(args))): + if arg[0] == "@": + hostname = args.pop(i) + if hostname in default_lists: + lists.insert(0, default_lists[hostname]) + else: + lists.insert(0, lists.DNSBL(hostname)) + + lists = lists or list(default_lists.values()) + + address = args[0] + failed = self._check_lists(lists, address) + if failed: + failed = ["%s (%s)" % item for item in failed] + event["stderr"].write("%s failed for lists: %s" % + (address, ", ".join(failed))) + else: + event["stdout"].write("%s not found in blacklists" % address) + + def _check_lists(self, lists, address): + address_obj = ipaddress.ip_address(address) + + if address_obj.version == 6: + address = reversed(address_obj.exploded.replace(":", "")) + else: + address = reversed(address.split(".")) + address = ".".join(address) + + failed = [] + for list in lists: + record = self._check_list(list.hostname, address) + if not record == None: + failed.append((list.hostname, list.process(record))) + return failed + + def _check_list(self, list, address): + list_address = "%s.%s" % (address, list) + try: + return dns.resolver.query(list_address, "A")[0].to_text() + except dns.resolver.NXDOMAIN: + return None diff --git a/modules/dnsbl/lists.py b/modules/dnsbl/lists.py new file mode 100644 index 00000000..0ae6a1b6 --- /dev/null +++ b/modules/dnsbl/lists.py @@ -0,0 +1,40 @@ +import collections + +class DNSBL(object): + def __init__(self, hostname=None): + if not hostname == None: + self.hostname = hostname + + def process(self, result: str): + return "unknown" + +class ZenSpamhaus(DNSBL): + hostname = "zen.spamhaus.org" + def process(self, result): + result = result.rsplit(".", 1)[1] + if result in ["2", "3", "9"]: + return "spam" + elif result in ["4", "5", "6", "7"]: + return "exploits" +class EFNetRBL(DNSBL): + hostname = "rbl.efnetrbl.org" + SPAMTRAP = ["2", "3"] + def process(self, result): + result = result.rsplit(".", 1)[1] + if result == "1": + return "proxy" + elif result in self.SPAMTRAP: + return "spamtap" + elif result == "4": + return "tor" + elif result == "5": + return "flooding" + +DEFAULT_LISTS = [ + ZenSpamhaus(), + EFNetRBL() +] + +def default_lists(): + return collections.OrderedDict( + (dnsbl.hostname, dnsbl) for dnsbl in DEFAULT_LISTS) -- cgit v1.3.1-10-gc9f91