Initial code.

This commit is contained in:
Daniele Maglie 2023-12-05 23:22:55 +01:00
commit 44ae7a0100
10 changed files with 290 additions and 0 deletions

1
.gitignore vendored Normal file
View file

@ -0,0 +1 @@
__pycache__/

18
table/activity.py Normal file
View file

@ -0,0 +1,18 @@
from .base import Base
class Activity(Base):
TABLE_ID = "tblTGVXeqPecdKpTm"
VARIATIONS_VIEW_ID = "viwxuGJT9KRlUISFa"
def get_variations(self) -> dict:
"""
Method used to retrieve variations from the view.
:return: list of variation records.
"""
response = self.request.get_activity(self.TABLE_ID, self.VARIATIONS_VIEW_ID)
return response.json()

14
table/base.py Normal file
View file

@ -0,0 +1,14 @@
import sys
import os
current = os.path.dirname('../')
sys.path.append(current)
from util.airtable_request import AirtableRequest
class Base:
request = None
def __init__(self):
self.request = AirtableRequest()

17
table/fqdn.py Normal file
View file

@ -0,0 +1,17 @@
from .base import Base
class FQDN(Base):
TABLE_ID = "tblK0murYomondNdt"
BCY_SERIE_A_VIEW_ID = "viwSKcHeEHqLG4ZU0"
def get_record(self, record_id):
response = self.request.get_record(self.TABLE_ID, record_id)
return response.json()
def get_total(self):
response = self.request.get_total(self.TABLE_ID, self.BCY_SERIE_A_VIEW_ID, ['Fully Qualified Domain Name'])
return response.json()

10
table/ip_address.py Normal file
View file

@ -0,0 +1,10 @@
from .base import Base
class IPAddress(Base):
TABLE_ID = "tblJKiHOUidKr5kC8"
def get_records(self):
response = self.request.get_total(self.TABLE_ID, None, ['IP Address'])
return response.json()

52
util/airtable_request.py Normal file
View file

@ -0,0 +1,52 @@
import requests
import urllib.parse
class AirtableRequest:
KEY = ""
BASE_URL = "https://api.airtable.com/v0"
BASE_ID = "appTJAR7zX7X8Rrk6"
headers = {
'Authorization': f'Bearer {KEY}'
}
def get_total(self, table_id: str, view_id: str = None, fields: list = []) -> dict:
compiled_fields = ""
for field in fields:
field = field.replace(" ", "+")
compiled_fields += urllib.parse.quote(f"fields[]={field}&", safe = '&=+')
compiled_fields = compiled_fields.rstrip("&")
url = f"{self.BASE_URL}/{self.BASE_ID}/{table_id}"
if view_id:
url += f"/{view_id}"
if compiled_fields:
url += f"?{compiled_fields}"
result = requests.get(url, headers = self.headers)
return result
def get_record(self, table_id, record_id):
url = f"{self.BASE_URL}/{self.BASE_ID}/{table_id}/{record_id}"
result = requests.get(url, headers = self.headers)
return result
def get_activity(self, table_id, view_id):
sorting = urllib.parse.quote("sort[0][field]=Day&sort[0][direction]=desc", safe = '&=')
url = f"{self.BASE_URL}/{self.BASE_ID}/{table_id}?view={view_id}&{sorting}"
result = requests.get(url, headers = self.headers)
return result

10
util/lookup.py Normal file
View file

@ -0,0 +1,10 @@
import socket
class Resolve:
def fqdn(self, fqdn):
try:
return socket.gethostbyname(fqdn)
except socket.gaierror:
return None

View file

@ -0,0 +1,19 @@
import requests
import urllib.parse
class SecurityTrailsRequest:
KEY = ""
BASE_URL = "https://api.securitytrails.com/v1"
headers = {
'APIKEY': KEY
}
def get_neighbors(self, ip_address: str) -> dict:
url = f"{self.BASE_URL}/ips/nearby/{ip_address}"
result = requests.get(url, headers = self.headers)
return result

8
util/whois.py Normal file
View file

@ -0,0 +1,8 @@
from whois import whois
class Whois:
def get_text(self, value):
result = whois(value)
return result.text

141
variations.py Normal file
View file

@ -0,0 +1,141 @@
from table.activity import Activity
from table.fqdn import FQDN
from table.ip_address import IPAddress
from util.lookup import Resolve
from util.whois import Whois
class Application:
activity = None
fqdn = None
ip_address = None
resolve = None
whois = None
def __init__(self):
self.activity = Activity()
self.fqdn = FQDN()
self.ip_address = IPAddress()
self.resolve = Resolve()
self.whois = Whois()
def process(self):
# list of ips to check
total_ip_addresses = self.get_clean_ip_addresses()
total_fqdns = self.get_clean_fqdns()
print(total_fqdns)
exit()
# list of grouped variations
grouped_variations = self.get_grouped_variations()
# loop through variations
for key in grouped_variations:
print(f" -> {key}")
# loop through iptvs under single variation
for iptv in grouped_variations[key]:
variated_fqdns = []
# loop through iptvs' fqdn
for fqdn_record_id in iptv['fields']['FQDN']:
single_fqdn_record = self.fqdn.get_record(fqdn_record_id)
if 'Univocal IP Address' in single_fqdn_record['fields'].keys():
resolved_ip_address = self.resolve.fqdn(single_fqdn_record['fields']['FQDN'])
# if empty, go on
if not resolved_ip_address:
continue
# if already in the ip addresses list, go on
if resolved_ip_address in total_ip_addresses:
continue
# if not changed, go on
if single_fqdn_record['fields']['Univocal IP Address'][0] == resolved_ip_address:
continue
if self.check_unwanteds(single_fqdn_record['fields']['FQDN']):
continue
single_variated_iptv = {
'fqdn': single_fqdn_record['fields']['FQDN'],
'old_ip_address': single_fqdn_record['fields']['Univocal IP Address'][0],
'new_ip_address': resolved_ip_address
}
# TODO: check ip nearbies on security trails
# TODO: build list of fqdns on airtable
variated_fqdns.append(single_variated_iptv)
if variated_fqdns:
print(f" - {iptv['fields']['IPTV Name VARIATION'][0]}")
for variated_fqdn in variated_fqdns:
print(f" {variated_fqdn['fqdn']} ({variated_fqdn['old_ip_address']} -> {variated_fqdn['new_ip_address']})")
def get_grouped_variations(self):
variations = self.activity.get_variations()
grouped_records = {}
# grouping records by Serie/Season/Day
for record in variations.get("records", []):
# field Serie/Season/Day
group_field_value = record["fields"].get("Serie/Season/Day")[0]
if group_field_value in list(grouped_records.keys()):
grouped_records[group_field_value].append(record)
else:
grouped_records[group_field_value] = [record]
return grouped_records
# [ '1.2.3.4', '1.1.1.1', .. ]
def get_clean_ip_addresses(self):
records = self.ip_address.get_records()
return self.get_only_field(records, 'IP Address')
def get_clean_fqdns(self):
records = self.fqdn.get_total()
return self.get_only_field(records, 'Fully Qualified Domain Name')
def get_only_field(self, data: dict, field: str):
clean_records = []
for record in data.get("records", []):
if field in record['fields'].keys():
clean_records.append(record['fields'][field])
return clean_records
def check_unwanteds(self, value):
result = self.whois.get_text(value)
result = result.lower()
if 'cloudflare' in result or 'namecheap' in result or 'amazon' in result or 'google' in result:
return True
return False
a = Application()
a.process()