bygglogistik_utils.py
cbbff016
 #!/usr/bin/env python3
 
 
 """
 bygglogistik-utils 1.0.0
 
 Scrape intranet.bygglogistik.se.
 
 Usage:
   bygglogistik-utils pay [--tax=<tax>]
   bygglogistik-utils personnel
   bygglogistik-utils report [-f <date_from>] [-t <date_to>] [-r <region>] [-p <project>]
   bygglogistik-utils path
   bygglogistik-utils -h|--help
   bygglogistik-utils --version
 
 Options:
   --tax=<tax>
     The tax rate, between 0.0 and 1.0.
     Defaults to 0.30.
   -f <date_from>, --date-from=<date_from>
     The date (in a ISO 8601 format) from which to gather reports (inclusive).
     Defaults to yesterday.
   -t <date_to>, --date-to=<date_to>
     The date (in a ISO 8601 format) to which to gather reports (inclusive).
     Defaults to <date_from>.
   -r <region>, --region=<region>
     The region of the report.
     Defaults to the region of the logged in user.
   -p <project>, --project=<project>
     The project of the report.
     Default to all projects.
 """
 
 
 ## Imports
 import sys
 import os
 import collections
 import itertools
 import functools
 import json
 import csv
 import datetime
 import time
 import random
 import warnings
 import docopt
 import requests
 import bs4
 
 
 ## Constants
 
 ### Connection
 URL_BASE = 'https://intranet.bygglogistik.se'
 
 ### Parsing
 PARSER = 'lxml'
 
 ### Storage
 CSV_DIALECT = csv.excel_tab
 
 ### Pay
 # TODO: Make this data time-dependent.
 PAY              = 168.83 + 5.0
 PAY_LEAD         = 45.13
 PAY_INCONVENIENT = 35.32
 PAY_HOLIDAY      = 0.13
 
 ### Paths
 PATH = (
     os.environ.get('BYGGLOGISTIK_PATH') or
     os.path.join(
         (
             os.environ.get('APPDATA') or
             os.environ.get('XDG_CONFIG_HOME') or
             os.path.join(os.environ['HOME'], '.config')
         ),
         'bygglogistik-utils',
     )
 )
 PATH_LOGIN = os.path.join(PATH, 'login')
 PATH_PAY   = os.path.join(PATH, 'pay')
 PATH_CACHE = os.path.join(PATH, 'cache')
 
 ### Sleep
 SLEEP_MIN = 5
 SLEEP_MAX = 10
 
 ### Output
 INDENT = ' ' * 2
 
 
 ## Warnings
 warnings.filterwarnings('ignore', category=bs4.XMLParsedAsHTMLWarning)
 
 
 ## Error
 def error(message):
     print(f"Error: {message}", file=sys.stderr)
     exit(1)
 
 
 ## Log
 def log(message):
     print(message, file=sys.stderr)
 
 
 ## Print category
 def print_category(indent, category, items):
     indent = INDENT * indent
     print(f'{indent}{category}')
     indent += INDENT
     length  = 1 + max(map(lambda i: len(i), items))
     for key, value in items.items():
         key += ':'
         print(f'{indent}{key: <{length}} {value}')
 
 
 ## Try read
 def try_read(path):
     try:
         with open(path) as file:
             return file.read()
     except FileNotFoundError:
         return None
     except OSError as e:
         error(f"Could not read '{path}': {e.strerror}")
 
 
 ## Try write
 def try_write(path, func):
     os.makedirs(os.path.dirname(path), mode=0o700, exist_ok=True)
     try:
         with open(path, 'w') as file:
             func(file)
     except OSError as e:
         error(f"Could not write '{path}': {e.strerror}")
 
 
 ## Text
 def text(element):
     return " ".join(element.stripped_strings).replace("- ", "")
 
 
 ## Texts
 def texts(iterable):
     return list(map(text, iterable))
 
 
 ## Telephone
 def telephone(s):
     # https://en.wikipedia.org/wiki/National_conventions_for_writing_telephone_numbers#Sweden
     # https://en.wikipedia.org/wiki/Telephone_numbers_in_Sweden#Area_codes
     n = ''.join(filter(str.isdigit, s))
     return f"{n[:3]}-{n[3:6]} {n[6:8]} {n[8:]}"
 
 
 ## Join
 def join(iterable):
     return CSV_DIALECT.delimiter.join(iterable)
 
 
 ## Get page
 def get_page(session, url, data=None, cache=False):
     if data:
         cache = False
     path_cache = f'{PATH_CACHE}/{url}'
     text = None
     if cache:
         text = try_read(path_cache)
     if not text:
         method = 'POST' if data else 'GET'
         response = session.request(method, f'{URL_BASE}{url}', data=data)
         if not response.ok:
             error(f"Got status code {response.status_code}")
         response.encoding = response.apparent_encoding
         text = response.text
         if cache:
             def cache_write(file):
                 file.write(text)
             try_write(path_cache, cache_write)
             # We use the fact that we're caching as a heuristic that we may be
             # putting the server under strain. Sleep for a random amount of
             # time to give it some breathing room.
             time_sleep = random.randrange(SLEEP_MIN, SLEEP_MAX+1)
             log(f"Info: Sleeping for {time_sleep} seconds")
             time.sleep(time_sleep)
     return bs4.BeautifulSoup(text, PARSER)
 
 
 ## Get table
 def get_table(page, id):
     table = [
         texts(row.select('th, td'))
         for row in
         page.select(f'table#{id} tr:not([id="comment"])')
     ]
     fieldnames, rows = table[0], table[1:]
     return fieldnames, [dict(zip(fieldnames, row)) for row in rows]
 
 
 ## Get selected
 def get_selected(page, id):
     return text(page.select_one(f'select#{id} > option:checked'))
 
 
 ## Get option
 def get_option(page, option, id):
     option = option.casefold()
     options = {
         text(option).casefold(): option['value']
         for option in
         page.select(f'select#{id} > option')
     }
     if not option in options:
         error("\n".join(
             [f"No such {id}: {option}. Valid {id}s:"] +
             [f'  {option}' for option in options.keys()]
         ))
     return options[option]
 
 
 ## Get status
 def get_status(element):
     status = next(filter(lambda c: c.startswith('status'), element['class']))
     return {
         0: 'not sent',
         1: 'sent',
         2: 'attested',
     }[int(status[-1])]
 
 
 ## Login
 def login():
     ### Get login email and password
     email    = None
     password = None
     login_content = try_read(PATH_LOGIN)
     if login_content:
         login_json = json.loads(login_content)
         email      = login_json.get('email')
         password   = login_json.get('password')
     email    = os.environ.get('BYGGLOGISTIK_EMAIL')    or email
     password = os.environ.get('BYGGLOGISTIK_PASSWORD') or password
     if not (email and password):
         error("Could not get login email and password")
     ### Login
     session = requests.Session()
     page = get_page(session, '/login', {
         'data[User][email]':    email,
         'data[User][password]': password,
     })
     if page.find('input', value='Logga in'):
         error("Could not log in")
     return session
 
 
 ## Pay
 def pay(session, tax):
     ### Arguments
     if tax is None:
         tax = 0.30
     tax = float(tax)
     ### Get data
     url  = get_page(session, '/').find('a', string='Mina sidor')['href']
     page = get_page(session, url)
     fieldnames, lifts = get_table(page, 'user_lifts_completed')
     ### Read and merge
     pay_content = try_read(PATH_PAY)
     if pay_content:
         dialect   = csv.Sniffer().sniff(pay_content)
         pay_lines = pay_content.splitlines()
         lifts += list(csv.DictReader(pay_lines, dialect=dialect))
     ### Deduplicate and sort lifts
     key = lambda lift: ([lift['Datum'], lift['Tid (rast)']])
     lifts = [
         next(group)
         for _, group in
         itertools.groupby(sorted(lifts, key=key), key=key)
     ]
     lifts.sort(key=key, reverse=True)
     ### Write
     def pay_write(file):
         pay_writer = csv.DictWriter(file, fieldnames, dialect=CSV_DIALECT)
         pay_writer.writeheader()
         pay_writer.writerows(lifts)
     try_write(PATH_PAY, pay_write)
     ### Summarize
     summary = collections.defaultdict(float)
     for lift in lifts:
         year, month, _ = lift['Datum'].split('-')
         time, time_inconvenient = [
             float(t.strip('()'))
             for t in
             lift['Rapporterad tid (ob)'].split()
         ]
         is_lead = bool(lift['Arbetsledare'])
         summary[f'{year}-{month}'] += (1 - tax) * (1 + PAY_HOLIDAY) * (
             time * (PAY + is_lead * PAY_LEAD) +
             time_inconvenient * PAY_INCONVENIENT
         )
     for period, pay in sorted(summary.items(), reverse=True):
         print(f'{period}: {pay: >5.0f}')
 
 
 ## Personnel
 def personnel(session):
     ### Get page
     page = get_page(session, '/users')
     ### Get personnel
     _, persons = get_table(page, 'users_table')
     for person in persons:
         print(join([
             person['Namn'],
             telephone(person['Mobil']),
             person['Position'],
         ]))
 
 
 ## Report
 def report(session, date_from, date_to, region, project):
     ### Get page
     page = get_page(session, '/reports')
     ### Arguments
     yesterday = datetime.date.today() - datetime.timedelta(days=1)
     if date_from is None: date_from = yesterday
     if date_to   is None: date_to   = date_from
     if region    is None: region    = get_selected(page, 'region')
     if project   is None: project   = get_selected(page, 'project')
     date_from = datetime.date.fromisoformat(str(date_from))
     date_to   = datetime.date.fromisoformat(str(date_to))
     region    = get_option(page, region,  'region')
     project   = get_option(page, project, 'project')
     ### Get reports
     url = f'/reports/index/{region}/{date_from}/{date_to}/{project}'
     page = get_page(session, url)
     ### Process reports
     for report in page.select(f'table#lift_list tr'):
         #### Get page
         status = get_status(report)
         project, date_time, lift, report = report.select('td')
         project    = text(project)
         lift       = text(lift.find('a'))
         date_time  = text(date_time).split()
         date, time = date_time[0], ''.join(date_time[1:])
         cache      = status == 'attested'
         page = get_page(session, f'/reports/view/lift/{lift}', cache=cache)
         print(f'{date} {project}')
         print(f'{INDENT}Status: {status}')
         #### Get workers
         _, workers = get_table(page, 'tr_list')
         number = len(workers)
         lead = next(filter(lambda w: w['Arbetsledare'], workers), None)
         if lead:
             lead = ' '.join(filter(None, [
                 lead['Namn'],
                 '(Sjuk)' if lead['Sjuk'] else None
             ]))
         def timediff(a, b):
             ah, am = map(float, a.split(':'))
             bh, bm = map(float, b.split(':'))
             return (ah - bh) + (am - bm) / 60
         time_regular      = 0
         time_inconvenient = 0
         for worker in workers:
             worker_time = float(worker['Timmar'])
             worker_time_calc = max(4.0, (
                 timediff(worker['Slut'], worker['Start']) -
                 float(worker['Rast (minuter)']) / 60
             ))
             if worker['Sjuk']:
                 if worker_time != 4.0:
                     log(
                         f"{INDENT}"
                         f"Warning: "
                         f"Time for sick worker {worker['Namn']} is incorrect: "
                         f"{worker_time} != 4.0"
                     )
             else:
                 if worker_time != worker_time_calc:
                     log(
                         f"{INDENT}"
                         f"Warning: "
                         f"Time for worker {worker['Namn']} is inconsistent: "
                         f"{worker_time} != {worker_time_calc}"
                     )
             worker_time_inconvenient = max(0,
                 timediff(worker['Slut'], '18:00') -
                 float(worker['Rast (minuter)']) / 60
             )
             time_regular      += worker_time - worker_time_inconvenient
             time_inconvenient += worker_time_inconvenient
         print_category(1, 'Workers', {
             'Number':            number,
             'Lead':              lead,
             'Time regular':      time_regular,
             'Time inconvenient': time_inconvenient,
         })
         #### Get services
         services = collections.defaultdict(float)
         for service in page.select('table#kr_list tr.service'):
             name  = service.find('td', class_='label').contents[0]
             value = float(service.find('input')['value'])
             if value:
                 services[name] += value
         if services:
             print_category(1, 'Services', services)
 
 
 ## Path
 def path():
     print(PATH)
 
 
 ## Main
 def main():
     args = docopt.docopt(__doc__, version=__doc__.splitlines()[1])
     if args['pay']:
         pay(login(),
             args['--tax'],
         )
     if args['personnel']:
         personnel(login())
     if args['report']:
         report(login(),
             args['--date-from'],
             args['--date-to'],
             args['--region'],
             args['--project'],
         )
     if args['path']:
         path()
 
 
 if __name__ == '__main__':
     main()