cbbff016 |
#!/usr/bin/env python3
"""
bygglogistik-utils 1.0.0
Scrape intranet.bygglogistik.se.
Usage:
bygglogistik-utils pay [--tax=<tax>]
bygglogistik-utils personnel
bygglogistik-utils report [-f <date_from>] [-t <date_to>] [-r <region>] [-p <project>]
bygglogistik-utils path
bygglogistik-utils -h|--help
bygglogistik-utils --version
Options:
--tax=<tax>
The tax rate, between 0.0 and 1.0.
Defaults to 0.30.
-f <date_from>, --date-from=<date_from>
The date (in a ISO 8601 format) from which to gather reports (inclusive).
Defaults to yesterday.
-t <date_to>, --date-to=<date_to>
The date (in a ISO 8601 format) to which to gather reports (inclusive).
Defaults to <date_from>.
-r <region>, --region=<region>
The region of the report.
Defaults to the region of the logged in user.
-p <project>, --project=<project>
The project of the report.
Default to all projects.
"""
## Imports
import sys
import os
import collections
import itertools
import functools
import json
import csv
import datetime
import time
import random
import warnings
import docopt
import requests
import bs4
## Constants
### Connection
URL_BASE = 'https://intranet.bygglogistik.se'
### Parsing
PARSER = 'lxml'
### Storage
CSV_DIALECT = csv.excel_tab
### Pay
# TODO: Make this data time-dependent.
PAY = 168.83 + 5.0
PAY_LEAD = 45.13
PAY_INCONVENIENT = 35.32
PAY_HOLIDAY = 0.13
### Paths
PATH = (
os.environ.get('BYGGLOGISTIK_PATH') or
os.path.join(
(
os.environ.get('APPDATA') or
os.environ.get('XDG_CONFIG_HOME') or
os.path.join(os.environ['HOME'], '.config')
),
'bygglogistik-utils',
)
)
PATH_LOGIN = os.path.join(PATH, 'login')
PATH_PAY = os.path.join(PATH, 'pay')
PATH_CACHE = os.path.join(PATH, 'cache')
### Sleep
SLEEP_MIN = 5
SLEEP_MAX = 10
### Output
INDENT = ' ' * 2
## Warnings
warnings.filterwarnings('ignore', category=bs4.XMLParsedAsHTMLWarning)
## Error
def error(message):
print(f"Error: {message}", file=sys.stderr)
exit(1)
## Log
def log(message):
print(message, file=sys.stderr)
## Print category
def print_category(indent, category, items):
indent = INDENT * indent
print(f'{indent}{category}')
indent += INDENT
length = 1 + max(map(lambda i: len(i), items))
for key, value in items.items():
key += ':'
print(f'{indent}{key: <{length}} {value}')
## Try read
def try_read(path):
try:
with open(path) as file:
return file.read()
except FileNotFoundError:
return None
except OSError as e:
error(f"Could not read '{path}': {e.strerror}")
## Try write
def try_write(path, func):
os.makedirs(os.path.dirname(path), mode=0o700, exist_ok=True)
try:
with open(path, 'w') as file:
func(file)
except OSError as e:
error(f"Could not write '{path}': {e.strerror}")
## Text
def text(element):
return " ".join(element.stripped_strings).replace("- ", "")
## Texts
def texts(iterable):
return list(map(text, iterable))
## Telephone
def telephone(s):
# https://en.wikipedia.org/wiki/National_conventions_for_writing_telephone_numbers#Sweden
# https://en.wikipedia.org/wiki/Telephone_numbers_in_Sweden#Area_codes
n = ''.join(filter(str.isdigit, s))
return f"{n[:3]}-{n[3:6]} {n[6:8]} {n[8:]}"
## Join
def join(iterable):
return CSV_DIALECT.delimiter.join(iterable)
## Get page
def get_page(session, url, data=None, cache=False):
if data:
cache = False
path_cache = f'{PATH_CACHE}/{url}'
text = None
if cache:
text = try_read(path_cache)
if not text:
method = 'POST' if data else 'GET'
response = session.request(method, f'{URL_BASE}{url}', data=data)
if not response.ok:
error(f"Got status code {response.status_code}")
response.encoding = response.apparent_encoding
text = response.text
if cache:
def cache_write(file):
file.write(text)
try_write(path_cache, cache_write)
# We use the fact that we're caching as a heuristic that we may be
# putting the server under strain. Sleep for a random amount of
# time to give it some breathing room.
time_sleep = random.randrange(SLEEP_MIN, SLEEP_MAX+1)
log(f"Info: Sleeping for {time_sleep} seconds")
time.sleep(time_sleep)
return bs4.BeautifulSoup(text, PARSER)
## Get table
def get_table(page, id):
table = [
texts(row.select('th, td'))
for row in
page.select(f'table#{id} tr:not([id="comment"])')
]
fieldnames, rows = table[0], table[1:]
return fieldnames, [dict(zip(fieldnames, row)) for row in rows]
## Get selected
def get_selected(page, id):
return text(page.select_one(f'select#{id} > option:checked'))
## Get option
def get_option(page, option, id):
option = option.casefold()
options = {
text(option).casefold(): option['value']
for option in
page.select(f'select#{id} > option')
}
if not option in options:
error("\n".join(
[f"No such {id}: {option}. Valid {id}s:"] +
[f' {option}' for option in options.keys()]
))
return options[option]
## Get status
def get_status(element):
status = next(filter(lambda c: c.startswith('status'), element['class']))
return {
0: 'not sent',
1: 'sent',
2: 'attested',
}[int(status[-1])]
## Login
def login():
### Get login email and password
email = None
password = None
login_content = try_read(PATH_LOGIN)
if login_content:
login_json = json.loads(login_content)
email = login_json.get('email')
password = login_json.get('password')
email = os.environ.get('BYGGLOGISTIK_EMAIL') or email
password = os.environ.get('BYGGLOGISTIK_PASSWORD') or password
if not (email and password):
error("Could not get login email and password")
### Login
session = requests.Session()
page = get_page(session, '/login', {
'data[User][email]': email,
'data[User][password]': password,
})
if page.find('input', value='Logga in'):
error("Could not log in")
return session
## Pay
def pay(session, tax):
### Arguments
if tax is None:
tax = 0.30
tax = float(tax)
### Get data
url = get_page(session, '/').find('a', string='Mina sidor')['href']
page = get_page(session, url)
fieldnames, lifts = get_table(page, 'user_lifts_completed')
### Read and merge
pay_content = try_read(PATH_PAY)
if pay_content:
dialect = csv.Sniffer().sniff(pay_content)
pay_lines = pay_content.splitlines()
lifts += list(csv.DictReader(pay_lines, dialect=dialect))
### Deduplicate and sort lifts
key = lambda lift: ([lift['Datum'], lift['Tid (rast)']])
lifts = [
next(group)
for _, group in
itertools.groupby(sorted(lifts, key=key), key=key)
]
lifts.sort(key=key, reverse=True)
### Write
def pay_write(file):
pay_writer = csv.DictWriter(file, fieldnames, dialect=CSV_DIALECT)
pay_writer.writeheader()
pay_writer.writerows(lifts)
try_write(PATH_PAY, pay_write)
### Summarize
summary = collections.defaultdict(float)
for lift in lifts:
year, month, _ = lift['Datum'].split('-')
time, time_inconvenient = [
float(t.strip('()'))
for t in
lift['Rapporterad tid (ob)'].split()
]
is_lead = bool(lift['Arbetsledare'])
summary[f'{year}-{month}'] += (1 - tax) * (1 + PAY_HOLIDAY) * (
time * (PAY + is_lead * PAY_LEAD) +
time_inconvenient * PAY_INCONVENIENT
)
for period, pay in sorted(summary.items(), reverse=True):
print(f'{period}: {pay: >5.0f}')
## Personnel
def personnel(session):
### Get page
page = get_page(session, '/users')
### Get personnel
_, persons = get_table(page, 'users_table')
for person in persons:
print(join([
person['Namn'],
telephone(person['Mobil']),
person['Position'],
]))
## Report
def report(session, date_from, date_to, region, project):
### Get page
page = get_page(session, '/reports')
### Arguments
yesterday = datetime.date.today() - datetime.timedelta(days=1)
if date_from is None: date_from = yesterday
if date_to is None: date_to = date_from
if region is None: region = get_selected(page, 'region')
if project is None: project = get_selected(page, 'project')
date_from = datetime.date.fromisoformat(str(date_from))
date_to = datetime.date.fromisoformat(str(date_to))
region = get_option(page, region, 'region')
project = get_option(page, project, 'project')
### Get reports
url = f'/reports/index/{region}/{date_from}/{date_to}/{project}'
page = get_page(session, url)
### Process reports
for report in page.select(f'table#lift_list tr'):
#### Get page
status = get_status(report)
project, date_time, lift, report = report.select('td')
project = text(project)
lift = text(lift.find('a'))
date_time = text(date_time).split()
date, time = date_time[0], ''.join(date_time[1:])
cache = status == 'attested'
page = get_page(session, f'/reports/view/lift/{lift}', cache=cache)
print(f'{date} {project}')
print(f'{INDENT}Status: {status}')
#### Get workers
_, workers = get_table(page, 'tr_list')
number = len(workers)
lead = next(filter(lambda w: w['Arbetsledare'], workers), None)
if lead:
lead = ' '.join(filter(None, [
lead['Namn'],
'(Sjuk)' if lead['Sjuk'] else None
]))
def timediff(a, b):
ah, am = map(float, a.split(':'))
bh, bm = map(float, b.split(':'))
return (ah - bh) + (am - bm) / 60
time_regular = 0
time_inconvenient = 0
for worker in workers:
worker_time = float(worker['Timmar'])
worker_time_calc = max(4.0, (
timediff(worker['Slut'], worker['Start']) -
float(worker['Rast (minuter)']) / 60
))
if worker['Sjuk']:
if worker_time != 4.0:
log(
f"{INDENT}"
f"Warning: "
f"Time for sick worker {worker['Namn']} is incorrect: "
f"{worker_time} != 4.0"
)
else:
if worker_time != worker_time_calc:
log(
f"{INDENT}"
f"Warning: "
f"Time for worker {worker['Namn']} is inconsistent: "
f"{worker_time} != {worker_time_calc}"
)
worker_time_inconvenient = max(0,
timediff(worker['Slut'], '18:00') -
float(worker['Rast (minuter)']) / 60
)
time_regular += worker_time - worker_time_inconvenient
time_inconvenient += worker_time_inconvenient
print_category(1, 'Workers', {
'Number': number,
'Lead': lead,
'Time regular': time_regular,
'Time inconvenient': time_inconvenient,
})
#### Get services
services = collections.defaultdict(float)
for service in page.select('table#kr_list tr.service'):
name = service.find('td', class_='label').contents[0]
value = float(service.find('input')['value'])
if value:
services[name] += value
if services:
print_category(1, 'Services', services)
## Path
def path():
print(PATH)
## Main
def main():
args = docopt.docopt(__doc__, version=__doc__.splitlines()[1])
if args['pay']:
pay(login(),
args['--tax'],
)
if args['personnel']:
personnel(login())
if args['report']:
report(login(),
args['--date-from'],
args['--date-to'],
args['--region'],
args['--project'],
)
if args['path']:
path()
if __name__ == '__main__':
main()
|