DMARC Report prosessing
I just would like to have DMARC report into 1 folder, and make them packed into CSV, as popular DMARC processing SaaS is a little bit costy.
code it into dmarc.py
import os
import gzip
import zipfile
from lxml import etree
import csv
import datetime
def extract_info_from_filename(filename):
"""
Extracts start and end datetimes from a given filename.
Args:
- filename (str): The filename containing Unix timestamps for start and end times.
- filename expecting..: 'google.com!domainname!1707004800!1707091199.xml'
Returns:
- tuple: A tuple containing start and end datetime objects.
"""
# Assuming filenames are like report_starttime_endtime.xml
# Extract timestamps from filename
parts = filename.replace(".gz", "").replace(".xml", "").split("!")
if len(parts) >= 4:
to_domain, from_domain, start_timestamp, end_timestamp, *other_abysss = parts
try:
start_datetime = datetime.datetime.fromtimestamp(int(start_timestamp))
end_datetime = datetime.datetime.fromtimestamp(int(end_timestamp))
except ValueError:
start_datetime = datetime.datetime.fromtimestamp(1)
end_datetime = datetime.datetime.fromtimestamp(2)
else:
to_domain = 'to.example.com'
from_domain = 'from.example.com'
start_datetime = datetime.datetime.fromtimestamp(1)
end_datetime = datetime.datetime.fromtimestamp(2)
return to_domain, from_domain, start_datetime, end_datetime
def extract_and_parse_files(directory, output_csv):
# Open or create the CSV file for output
with open(output_csv, 'w', newline='', encoding='utf-8') as csvfile:
csvwriter = csv.writer(csvfile)
# Write the CSV header
csvwriter.writerow(['Filename', 'src', 'count', 'Header-From Domain', 'SPF Domnain', 'SPF check', 'SPF alignment', 'DKIM Domain', 'DKIM check', 'DKIM alignment' , 'Detail', 'DMARC disposition', 'To', 'From', 'Start', 'End'])
# Process each file in the directory
for root, dirs, files in os.walk(directory):
for file in files:
file_path = os.path.join(root, file)
# Check if the file is a .gz or .zip file and extract XML content
if file.endswith('.gz'):
with gzip.open(file_path, 'rb') as f:
xml_content = f.read()
parse_xml_content(xml_content, file, csvwriter)
elif file.endswith('.zip'):
with zipfile.ZipFile(file_path, 'r') as zip_ref:
for name in zip_ref.namelist():
with zip_ref.open(name) as f:
xml_content = f.read()
parse_xml_content(xml_content, name, csvwriter)
def parse_xml_content(xml_content, filename, csvwriter):
# Parse the XML content
root = etree.fromstring(xml_content)
# Iterate through the records in the XML
for record in root.findall('.//record'):
row = record.find('.//row')
source_ip = row.find('.//source_ip').text
count = row.find('.//count').text
policy_evaluated = row.find('.//policy_evaluated')
identifiers = record.find('.//identifiers')
header_from = identifiers.find('.//header_from').text if identifiers is not None else 'Unknown'
# DMARC evaluation
dmarc_fail = policy_evaluated.find('.//disposition').text
# SPF evaluation
spf_result = record.find('.//auth_results//spf//result').text if record.find('.//auth_results//spf//result') is not None else 'n/a'
spf_fail = spf_result
spf_domain = record.find('.//auth_results//spf//domain').text if record.find('.//auth_results//spf//domain') is not None else 'n/a'
spf_aligned = 'pass' if spf_domain == header_from else 'fail'
# # DKIM evaluation
# dkim_result = record.find('.//auth_results//dkim//result').text if record.find('.//auth_results//dkim//result') is not None else 'n/a'
# dkim_fail = dkim_result
# dkim_domain = record.find('.//auth_results//dkim//domain').text if record.find('.//auth_results//dkim//domain') is not None else 'n/a'
# dkim_aligned = 'pass' if dkim_domain == header_from else 'fail'
dkim_detail = ''
dkim_fail = ''
dkim_domain = ''
dkim_aligned = ''
for dkim in record.findall('.//auth_results//dkim'):
d_result = dkim.find('.//result').text if dkim.find('.//result') is not None else 'none'
d_domain = dkim.find('.//domain').text if dkim.find('.//domain') is not None else 'none'
d_aligned= 'pass' if d_domain == header_from else 'fail'
dkim_detail += d_result + ';' + d_domain + ';' + d_aligned + '; '
if dkim_fail == '' and dkim_domain == '' and dkim_aligned == '':
dkim_fail = d_result
dkim_domain = d_domain
dkim_aligned = d_aligned
else:
if d_result == 'pass' and d_aligned == 'pass':
dkim_fail = d_result
dkim_domain = d_domain
dkim_aligned = d_aligned
if d_aligned == 'fail':
dkim_detail += '\n' + etree.tostring(record, pretty_print=True).decode() + '\n'
# info from filename
to_domain, from_domain, start_datetime, end_datetime = extract_info_from_filename(filename)
# Write the record to the CSV
csvwriter.writerow(['\'' + filename + '\'' , source_ip, count, header_from, spf_domain , spf_fail, spf_aligned, dkim_domain , dkim_fail, dkim_aligned , dkim_detail, dmarc_fail, to_domain, from_domain, start_datetime, end_datetime])
def main():
directory = 'dmarc-reps' # Update this path to where your DMARC reports are stored
output_csv = 'dmarc_results.csv' # Output CSV file name
extract_and_parse_files(directory, output_csv)
if __name__ == "__main__":
main()
How to use
install python
sudo apt install python3 python-is-python3
pip install gzip zipfile lxml
code dmarc.py
mkdir dmarc-reps
Place gz or zip files into dmarc-reps folder, upon creation of dmarc.oy and dmarc-reps folder.
Run it and open dmarc_result.csv
python dmarc.py
code dmarc_result.csv
It looks like this, and you can check DMARC pass or not, checking either of SPF/SPF-alignment or DKIM/DKIM-alignment pass or not.
Enjoy! and if you have questions, please DM on https://twitter.com/rtree