diff options
| author | 2019-12-12 14:13:17 +0000 | |
|---|---|---|
| committer | 2019-12-12 14:13:17 +0000 | |
| commit | fbe1acf220b69e244776105390b1abf5b77a96e8 (patch) | |
| tree | 46dfcb8182ea2a069809b936e5ae72271995ad89 /modules/dnsbl/__init__.py | |
| parent | list.insert() takes an index! (diff) | |
| signature | ||
refactor dnsbl module, show reason for positive detection when possible
Diffstat (limited to 'modules/dnsbl/__init__.py')
| -rw-r--r-- | modules/dnsbl/__init__.py | 53 |
1 files changed, 53 insertions, 0 deletions
diff --git a/modules/dnsbl/__init__.py b/modules/dnsbl/__init__.py new file mode 100644 index 00000000..2c3fa36d --- /dev/null +++ b/modules/dnsbl/__init__.py @@ -0,0 +1,53 @@ +import ipaddress +from src import ModuleManager, utils +import dns.resolver +from . import lists as _lists + +class Module(ModuleManager.BaseModule): + @utils.hook("received.command.dnsbl") + def dnsbl(self, event): + args = event["args_split"] + + default_lists = _lists.default_lists() + lists = [] + for i, arg in reversed(list(enumerate(args))): + if arg[0] == "@": + hostname = args.pop(i) + if hostname in default_lists: + lists.insert(0, default_lists[hostname]) + else: + lists.insert(0, lists.DNSBL(hostname)) + + lists = lists or list(default_lists.values()) + + address = args[0] + failed = self._check_lists(lists, address) + if failed: + failed = ["%s (%s)" % item for item in failed] + event["stderr"].write("%s failed for lists: %s" % + (address, ", ".join(failed))) + else: + event["stdout"].write("%s not found in blacklists" % address) + + def _check_lists(self, lists, address): + address_obj = ipaddress.ip_address(address) + + if address_obj.version == 6: + address = reversed(address_obj.exploded.replace(":", "")) + else: + address = reversed(address.split(".")) + address = ".".join(address) + + failed = [] + for list in lists: + record = self._check_list(list.hostname, address) + if not record == None: + failed.append((list.hostname, list.process(record))) + return failed + + def _check_list(self, list, address): + list_address = "%s.%s" % (address, list) + try: + return dns.resolver.query(list_address, "A")[0].to_text() + except dns.resolver.NXDOMAIN: + return None |
