#!/usr/bin/env python3 """ bygglogistik-utils 1.0.0 Scrape intranet.bygglogistik.se. Usage: bygglogistik-utils pay [--tax=<tax>] bygglogistik-utils personnel bygglogistik-utils report [-f <date_from>] [-t <date_to>] [-r <region>] [-p <project>] bygglogistik-utils path bygglogistik-utils -h|--help bygglogistik-utils --version Options: --tax=<tax> The tax rate, between 0.0 and 1.0. Defaults to 0.30. -f <date_from>, --date-from=<date_from> The date (in a ISO 8601 format) from which to gather reports (inclusive). Defaults to yesterday. -t <date_to>, --date-to=<date_to> The date (in a ISO 8601 format) to which to gather reports (inclusive). Defaults to <date_from>. -r <region>, --region=<region> The region of the report. Defaults to the region of the logged in user. -p <project>, --project=<project> The project of the report. Default to all projects. """ ## Imports import sys import os import collections import itertools import functools import json import csv import datetime import time import random import warnings import docopt import requests import bs4 ## Constants ### Connection URL_BASE = 'https://intranet.bygglogistik.se' ### Parsing PARSER = 'lxml' ### Storage CSV_DIALECT = csv.excel_tab ### Pay # TODO: Make this data time-dependent. PAY = 168.83 + 5.0 PAY_LEAD = 45.13 PAY_INCONVENIENT = 35.32 PAY_HOLIDAY = 0.13 ### Paths PATH = ( os.environ.get('BYGGLOGISTIK_PATH') or os.path.join( ( os.environ.get('APPDATA') or os.environ.get('XDG_CONFIG_HOME') or os.path.join(os.environ['HOME'], '.config') ), 'bygglogistik-utils', ) ) PATH_LOGIN = os.path.join(PATH, 'login') PATH_PAY = os.path.join(PATH, 'pay') PATH_CACHE = os.path.join(PATH, 'cache') ### Sleep SLEEP_MIN = 5 SLEEP_MAX = 10 ### Output INDENT = ' ' * 2 ## Warnings warnings.filterwarnings('ignore', category=bs4.XMLParsedAsHTMLWarning) ## Error def error(message): print(f"Error: {message}", file=sys.stderr) exit(1) ## Log def log(message): print(message, file=sys.stderr) ## Print category def print_category(indent, category, items): indent = INDENT * indent print(f'{indent}{category}') indent += INDENT length = 1 + max(map(lambda i: len(i), items)) for key, value in items.items(): key += ':' print(f'{indent}{key: <{length}} {value}') ## Try read def try_read(path): try: with open(path) as file: return file.read() except FileNotFoundError: return None except OSError as e: error(f"Could not read '{path}': {e.strerror}") ## Try write def try_write(path, func): os.makedirs(os.path.dirname(path), mode=0o700, exist_ok=True) try: with open(path, 'w') as file: func(file) except OSError as e: error(f"Could not write '{path}': {e.strerror}") ## Text def text(element): return " ".join(element.stripped_strings).replace("- ", "") ## Texts def texts(iterable): return list(map(text, iterable)) ## Telephone def telephone(s): # https://en.wikipedia.org/wiki/National_conventions_for_writing_telephone_numbers#Sweden # https://en.wikipedia.org/wiki/Telephone_numbers_in_Sweden#Area_codes n = ''.join(filter(str.isdigit, s)) return f"{n[:3]}-{n[3:6]} {n[6:8]} {n[8:]}" ## Join def join(iterable): return CSV_DIALECT.delimiter.join(iterable) ## Get page def get_page(session, url, data=None, cache=False): if data: cache = False path_cache = f'{PATH_CACHE}/{url}' text = None if cache: text = try_read(path_cache) if not text: method = 'POST' if data else 'GET' response = session.request(method, f'{URL_BASE}{url}', data=data) if not response.ok: error(f"Got status code {response.status_code}") response.encoding = response.apparent_encoding text = response.text if cache: def cache_write(file): file.write(text) try_write(path_cache, cache_write) # We use the fact that we're caching as a heuristic that we may be # putting the server under strain. Sleep for a random amount of # time to give it some breathing room. time_sleep = random.randrange(SLEEP_MIN, SLEEP_MAX+1) log(f"Info: Sleeping for {time_sleep} seconds") time.sleep(time_sleep) return bs4.BeautifulSoup(text, PARSER) ## Get table def get_table(page, id): table = [ texts(row.select('th, td')) for row in page.select(f'table#{id} tr:not([id="comment"])') ] fieldnames, rows = table[0], table[1:] return fieldnames, [dict(zip(fieldnames, row)) for row in rows] ## Get selected def get_selected(page, id): return text(page.select_one(f'select#{id} > option:checked')) ## Get option def get_option(page, option, id): option = option.casefold() options = { text(option).casefold(): option['value'] for option in page.select(f'select#{id} > option') } if not option in options: error("\n".join( [f"No such {id}: {option}. Valid {id}s:"] + [f' {option}' for option in options.keys()] )) return options[option] ## Get status def get_status(element): status = next(filter(lambda c: c.startswith('status'), element['class'])) return { 0: 'not sent', 1: 'sent', 2: 'attested', }[int(status[-1])] ## Login def login(): ### Get login email and password email = None password = None login_content = try_read(PATH_LOGIN) if login_content: login_json = json.loads(login_content) email = login_json.get('email') password = login_json.get('password') email = os.environ.get('BYGGLOGISTIK_EMAIL') or email password = os.environ.get('BYGGLOGISTIK_PASSWORD') or password if not (email and password): error("Could not get login email and password") ### Login session = requests.Session() page = get_page(session, '/login', { 'data[User][email]': email, 'data[User][password]': password, }) if page.find('input', value='Logga in'): error("Could not log in") return session ## Pay def pay(session, tax): ### Arguments if tax is None: tax = 0.30 tax = float(tax) ### Get data url = get_page(session, '/').find('a', string='Mina sidor')['href'] page = get_page(session, url) fieldnames, lifts = get_table(page, 'user_lifts_completed') ### Read and merge pay_content = try_read(PATH_PAY) if pay_content: dialect = csv.Sniffer().sniff(pay_content) pay_lines = pay_content.splitlines() lifts += list(csv.DictReader(pay_lines, dialect=dialect)) ### Deduplicate and sort lifts key = lambda lift: ([lift['Datum'], lift['Tid (rast)']]) lifts = [ next(group) for _, group in itertools.groupby(sorted(lifts, key=key), key=key) ] lifts.sort(key=key, reverse=True) ### Write def pay_write(file): pay_writer = csv.DictWriter(file, fieldnames, dialect=CSV_DIALECT) pay_writer.writeheader() pay_writer.writerows(lifts) try_write(PATH_PAY, pay_write) ### Summarize summary = collections.defaultdict(float) for lift in lifts: year, month, _ = lift['Datum'].split('-') time, time_inconvenient = [ float(t.strip('()')) for t in lift['Rapporterad tid (ob)'].split() ] is_lead = bool(lift['Arbetsledare']) summary[f'{year}-{month}'] += (1 - tax) * (1 + PAY_HOLIDAY) * ( time * (PAY + is_lead * PAY_LEAD) + time_inconvenient * PAY_INCONVENIENT ) for period, pay in sorted(summary.items(), reverse=True): print(f'{period}: {pay: >5.0f}') ## Personnel def personnel(session): ### Get page page = get_page(session, '/users') ### Get personnel _, persons = get_table(page, 'users_table') for person in persons: print(join([ person['Namn'], telephone(person['Mobil']), person['Position'], ])) ## Report def report(session, date_from, date_to, region, project): ### Get page page = get_page(session, '/reports') ### Arguments yesterday = datetime.date.today() - datetime.timedelta(days=1) if date_from is None: date_from = yesterday if date_to is None: date_to = date_from if region is None: region = get_selected(page, 'region') if project is None: project = get_selected(page, 'project') date_from = datetime.date.fromisoformat(str(date_from)) date_to = datetime.date.fromisoformat(str(date_to)) region = get_option(page, region, 'region') project = get_option(page, project, 'project') ### Get reports url = f'/reports/index/{region}/{date_from}/{date_to}/{project}' page = get_page(session, url) ### Process reports for report in page.select(f'table#lift_list tr'): #### Get page status = get_status(report) project, date_time, lift, report = report.select('td') project = text(project) lift = text(lift.find('a')) date_time = text(date_time).split() date, time = date_time[0], ''.join(date_time[1:]) cache = status == 'attested' page = get_page(session, f'/reports/view/lift/{lift}', cache=cache) print(f'{date} {project}') print(f'{INDENT}Status: {status}') #### Get workers _, workers = get_table(page, 'tr_list') number = len(workers) lead = next(filter(lambda w: w['Arbetsledare'], workers), None) if lead: lead = ' '.join(filter(None, [ lead['Namn'], '(Sjuk)' if lead['Sjuk'] else None ])) def timediff(a, b): ah, am = map(float, a.split(':')) bh, bm = map(float, b.split(':')) return (ah - bh) + (am - bm) / 60 time_regular = 0 time_inconvenient = 0 for worker in workers: worker_time = float(worker['Timmar']) worker_time_calc = max(4.0, ( timediff(worker['Slut'], worker['Start']) - float(worker['Rast (minuter)']) / 60 )) if worker['Sjuk']: if worker_time != 4.0: log( f"{INDENT}" f"Warning: " f"Time for sick worker {worker['Namn']} is incorrect: " f"{worker_time} != 4.0" ) else: if worker_time != worker_time_calc: log( f"{INDENT}" f"Warning: " f"Time for worker {worker['Namn']} is inconsistent: " f"{worker_time} != {worker_time_calc}" ) worker_time_inconvenient = max(0, timediff(worker['Slut'], '18:00') - float(worker['Rast (minuter)']) / 60 ) time_regular += worker_time - worker_time_inconvenient time_inconvenient += worker_time_inconvenient print_category(1, 'Workers', { 'Number': number, 'Lead': lead, 'Time regular': time_regular, 'Time inconvenient': time_inconvenient, }) #### Get services services = collections.defaultdict(float) for service in page.select('table#kr_list tr.service'): name = service.find('td', class_='label').contents[0] value = float(service.find('input')['value']) if value: services[name] += value if services: print_category(1, 'Services', services) ## Path def path(): print(PATH) ## Main def main(): args = docopt.docopt(__doc__, version=__doc__.splitlines()[1]) if args['pay']: pay(login(), args['--tax'], ) if args['personnel']: personnel(login()) if args['report']: report(login(), args['--date-from'], args['--date-to'], args['--region'], args['--project'], ) if args['path']: path() if __name__ == '__main__': main()