commit 44ae7a0100598a7a7561e9eb331012aa8f518a1f Author: Daniele Maglie Date: Tue Dec 5 23:22:55 2023 +0100 Initial code. diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..c18dd8d --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +__pycache__/ diff --git a/table/activity.py b/table/activity.py new file mode 100644 index 0000000..ecf2d04 --- /dev/null +++ b/table/activity.py @@ -0,0 +1,18 @@ +from .base import Base + +class Activity(Base): + + TABLE_ID = "tblTGVXeqPecdKpTm" + + VARIATIONS_VIEW_ID = "viwxuGJT9KRlUISFa" + + def get_variations(self) -> dict: + """ + Method used to retrieve variations from the view. + + :return: list of variation records. + """ + + response = self.request.get_activity(self.TABLE_ID, self.VARIATIONS_VIEW_ID) + + return response.json() diff --git a/table/base.py b/table/base.py new file mode 100644 index 0000000..775296d --- /dev/null +++ b/table/base.py @@ -0,0 +1,14 @@ +import sys +import os + +current = os.path.dirname('../') +sys.path.append(current) + +from util.airtable_request import AirtableRequest + +class Base: + + request = None + + def __init__(self): + self.request = AirtableRequest() diff --git a/table/fqdn.py b/table/fqdn.py new file mode 100644 index 0000000..17c3a6c --- /dev/null +++ b/table/fqdn.py @@ -0,0 +1,17 @@ +from .base import Base + +class FQDN(Base): + + TABLE_ID = "tblK0murYomondNdt" + + BCY_SERIE_A_VIEW_ID = "viwSKcHeEHqLG4ZU0" + + def get_record(self, record_id): + response = self.request.get_record(self.TABLE_ID, record_id) + + return response.json() + + def get_total(self): + response = self.request.get_total(self.TABLE_ID, self.BCY_SERIE_A_VIEW_ID, ['Fully Qualified Domain Name']) + + return response.json() diff --git a/table/ip_address.py b/table/ip_address.py new file mode 100644 index 0000000..0adc6a1 --- /dev/null +++ b/table/ip_address.py @@ -0,0 +1,10 @@ +from .base import Base + +class IPAddress(Base): + + TABLE_ID = "tblJKiHOUidKr5kC8" + + def get_records(self): + response = self.request.get_total(self.TABLE_ID, None, ['IP Address']) + + return response.json() diff --git a/util/airtable_request.py b/util/airtable_request.py new file mode 100644 index 0000000..350cd6e --- /dev/null +++ b/util/airtable_request.py @@ -0,0 +1,52 @@ +import requests +import urllib.parse + +class AirtableRequest: + + KEY = "" + + BASE_URL = "https://api.airtable.com/v0" + + BASE_ID = "appTJAR7zX7X8Rrk6" + + headers = { + 'Authorization': f'Bearer {KEY}' + } + + def get_total(self, table_id: str, view_id: str = None, fields: list = []) -> dict: + compiled_fields = "" + + for field in fields: + field = field.replace(" ", "+") + + compiled_fields += urllib.parse.quote(f"fields[]={field}&", safe = '&=+') + + compiled_fields = compiled_fields.rstrip("&") + + url = f"{self.BASE_URL}/{self.BASE_ID}/{table_id}" + + if view_id: + url += f"/{view_id}" + + if compiled_fields: + url += f"?{compiled_fields}" + + result = requests.get(url, headers = self.headers) + + return result + + def get_record(self, table_id, record_id): + url = f"{self.BASE_URL}/{self.BASE_ID}/{table_id}/{record_id}" + + result = requests.get(url, headers = self.headers) + + return result + + def get_activity(self, table_id, view_id): + sorting = urllib.parse.quote("sort[0][field]=Day&sort[0][direction]=desc", safe = '&=') + + url = f"{self.BASE_URL}/{self.BASE_ID}/{table_id}?view={view_id}&{sorting}" + + result = requests.get(url, headers = self.headers) + + return result diff --git a/util/lookup.py b/util/lookup.py new file mode 100644 index 0000000..87857e5 --- /dev/null +++ b/util/lookup.py @@ -0,0 +1,10 @@ +import socket + +class Resolve: + + def fqdn(self, fqdn): + try: + return socket.gethostbyname(fqdn) + + except socket.gaierror: + return None diff --git a/util/securitytrails_request.py b/util/securitytrails_request.py new file mode 100644 index 0000000..6bf4483 --- /dev/null +++ b/util/securitytrails_request.py @@ -0,0 +1,19 @@ +import requests +import urllib.parse + +class SecurityTrailsRequest: + + KEY = "" + + BASE_URL = "https://api.securitytrails.com/v1" + + headers = { + 'APIKEY': KEY + } + + def get_neighbors(self, ip_address: str) -> dict: + url = f"{self.BASE_URL}/ips/nearby/{ip_address}" + + result = requests.get(url, headers = self.headers) + + return result diff --git a/util/whois.py b/util/whois.py new file mode 100644 index 0000000..88b27ba --- /dev/null +++ b/util/whois.py @@ -0,0 +1,8 @@ +from whois import whois + +class Whois: + + def get_text(self, value): + result = whois(value) + + return result.text diff --git a/variations.py b/variations.py new file mode 100644 index 0000000..e180b29 --- /dev/null +++ b/variations.py @@ -0,0 +1,141 @@ +from table.activity import Activity +from table.fqdn import FQDN +from table.ip_address import IPAddress + +from util.lookup import Resolve +from util.whois import Whois + +class Application: + + activity = None + + fqdn = None + + ip_address = None + + resolve = None + + whois = None + + def __init__(self): + self.activity = Activity() + + self.fqdn = FQDN() + + self.ip_address = IPAddress() + + self.resolve = Resolve() + + self.whois = Whois() + + def process(self): + # list of ips to check + total_ip_addresses = self.get_clean_ip_addresses() + + total_fqdns = self.get_clean_fqdns() + + print(total_fqdns) + exit() + + # list of grouped variations + grouped_variations = self.get_grouped_variations() + + # loop through variations + for key in grouped_variations: + print(f" -> {key}") + + # loop through iptvs under single variation + for iptv in grouped_variations[key]: + variated_fqdns = [] + + # loop through iptvs' fqdn + for fqdn_record_id in iptv['fields']['FQDN']: + single_fqdn_record = self.fqdn.get_record(fqdn_record_id) + + if 'Univocal IP Address' in single_fqdn_record['fields'].keys(): + resolved_ip_address = self.resolve.fqdn(single_fqdn_record['fields']['FQDN']) + + # if empty, go on + if not resolved_ip_address: + continue + + # if already in the ip addresses list, go on + if resolved_ip_address in total_ip_addresses: + continue + + # if not changed, go on + if single_fqdn_record['fields']['Univocal IP Address'][0] == resolved_ip_address: + continue + + if self.check_unwanteds(single_fqdn_record['fields']['FQDN']): + continue + + single_variated_iptv = { + 'fqdn': single_fqdn_record['fields']['FQDN'], + 'old_ip_address': single_fqdn_record['fields']['Univocal IP Address'][0], + 'new_ip_address': resolved_ip_address + } + + # TODO: check ip nearbies on security trails + + # TODO: build list of fqdns on airtable + + variated_fqdns.append(single_variated_iptv) + + if variated_fqdns: + print(f" - {iptv['fields']['IPTV Name VARIATION'][0]}") + + for variated_fqdn in variated_fqdns: + print(f" {variated_fqdn['fqdn']} ({variated_fqdn['old_ip_address']} -> {variated_fqdn['new_ip_address']})") + + def get_grouped_variations(self): + variations = self.activity.get_variations() + + grouped_records = {} + + # grouping records by Serie/Season/Day + for record in variations.get("records", []): + # field Serie/Season/Day + group_field_value = record["fields"].get("Serie/Season/Day")[0] + + if group_field_value in list(grouped_records.keys()): + grouped_records[group_field_value].append(record) + + else: + grouped_records[group_field_value] = [record] + + return grouped_records + + # [ '1.2.3.4', '1.1.1.1', .. ] + def get_clean_ip_addresses(self): + records = self.ip_address.get_records() + + return self.get_only_field(records, 'IP Address') + + def get_clean_fqdns(self): + records = self.fqdn.get_total() + + return self.get_only_field(records, 'Fully Qualified Domain Name') + + def get_only_field(self, data: dict, field: str): + clean_records = [] + + for record in data.get("records", []): + if field in record['fields'].keys(): + clean_records.append(record['fields'][field]) + + return clean_records + + def check_unwanteds(self, value): + result = self.whois.get_text(value) + + result = result.lower() + + if 'cloudflare' in result or 'namecheap' in result or 'amazon' in result or 'google' in result: + return True + + return False + +a = Application() + +a.process()