diff --git a/.gitignore b/.gitignore index 63197bf..83b0290 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,3 @@ +__pycache__/ /config.json /auth.json diff --git a/main.py b/main.py index 12a1d40..89f0e11 100644 --- a/main.py +++ b/main.py @@ -1,3 +1,4 @@ +import logging import json import os import time @@ -6,11 +7,14 @@ from datetime import datetime, timezone from html.parser import HTMLParser from mastodon import Mastodon +import mods.elicense + class Handler: def __init__(self, app, st): - self.app = app - self.st = st + self.logging = logging.getLogger("app.handler") + self.app = app + self.st = st self.text = sanitize_html(st["content"]) self.tokens = self.text.split()[1:] @@ -18,7 +22,6 @@ class Handler: self.handle() def handle(self): - print(self.tokens) if self.tokens[0] == "watch": self.watch() else: @@ -32,45 +35,24 @@ class Handler: if elapsed.seconds < self.app.CONFIG["interval"]: self.app.reply(self.st, "accepted") - print("watching", date, slots) + mod = self.app.mods["elicense"] + mod.watch(self.st["id"], date, slots) class App: def __init__(self): + logging.basicConfig() + self.logging = logging.getLogger("app") + self.logging.setLevel(logging.DEBUG) + with open("config.json", "r") as f: self.CONFIG = json.load(f) + self.logging.info("configuration loaded") + self.M = self.authenticate() - self.run() + self.logging.info("authentication succeeded") - def reply(self, st, msg): - try: - print(msg) - #self.M.status_post( - # status = "@{} {}".format(st["account"]["acct"], msg), - # in_reply_to_id = st["id"]) - except Exception as e: - print("reply failure:", msg, "(", e, ")") - - def run(self): - while True: - try: - items = self.M.notifications( - account_id = self.CONFIG["master"], - types = "mention") - for item in items: - st = item["status"] - # handle the item - try: - Handler(self, st) - except Exception as e: - self.reply(st, "handling aborted: {}".format(e)) - self.M.notifications_dismiss(id = item["id"]) - - print("cycle done") - except Exception as e: - print("cycle aborted:", e) - time.sleep(self.CONFIG["interval"]) - return + self.mods = {} def authenticate(self): if os.path.exists("auth.json"): @@ -100,9 +82,50 @@ class App: dic = {"client_id": cid, "secret": secret, "token": tok} with open("auth.json", "w") as f: json.dump(dic, f, indent = 2) - return M + def install(self, name, mod): + self.logging.info("mod installed, "+name) + + logger = logging.getLogger("mod."+name) + self.mods[name] = mod.init(self.CONFIG["mods"][name], logger) + + def run(self): + self.install("elicense", mods.elicense) + while True: + try: + self.cycle() + self.logging.debug("cycle done") + except Exception as e: + self.logging.warning("cycle aborted:", e) + + for mod in self.mods: + self.mods[mod].cycle() + time.sleep(self.CONFIG["interval"]) + + def cycle(self): + items = self.M.notifications( + account_id = self.CONFIG["master"], + types = "mention") + for item in items: + st = item["status"] + try: + Handler(self, st) + except Exception as e: + self.reply(st, "handling aborted: {}".format(e)) + self.M.notifications_dismiss(id = item["id"]) + + # TODO: call mod cycles + + def reply(self, st, msg): + try: + self.logging.debug("reply:", msg) + #self.M.status_post( + # status = "@{} {}".format(st["account"]["acct"], msg), + # in_reply_to_id = st["id"]) + except Exception as e: + self.logging.error("reply failure:", msg, "(", e, ")") + def sanitize_html(html): class F(HTMLParser): @@ -115,4 +138,4 @@ def sanitize_html(html): f.feed(html) return f.text -App() +App().run() diff --git a/mods/elicense.py b/mods/elicense.py new file mode 100644 index 0000000..a7bfabd --- /dev/null +++ b/mods/elicense.py @@ -0,0 +1,81 @@ +import logging +import json +import requests + +from html.parser import HTMLParser + + +def init(CONFIG, logger): + logger.setLevel(logging.DEBUG) + return Mod(CONFIG, logger) + + +class Mod: + def __init__(self, CONFIG, logging): + self.URL = "https://www.e-license.jp/el25/pc/" + self.CONFIG = CONFIG + + self.logging = logging + self.targets = [] + + self.ss = self.authenticate() + + def cycle(self): + res = self.ss.post( + url = self.URL+"p04a.action", + data = { + "b.processCd": "A", + "b.kamokuCd" : 0, + "b.page" : 1, + "b.schoolCd" : self.CONFIG["school"], + }) + res.encoding = "shift_jis" + + parser = ReservationListParser() + parser.feed(res.text) + + return + + def watch(self, date, slots): + self.logging.debug("watch request accepted: {} {}".format(+date+slots)) + self.targets.append((date, slots)) + + def authenticate(self): + ss = requests.Session() + try: + res = ss.post( + url = self.URL+"p01a.action", + data = { + "b.studentId" : self.CONFIG["username"], + "b.password" : self.CONFIG["password"], + "b.schoolCd" : self.CONFIG["school"], + "b.processCd" : "", + "b.kamokuCd" : "", + "method:doLogin" : "ログイン".encode("shift_jis"), + }) + self.logging.info("authenticated") + return ss + except Exception as e: + self.logging.error("authentication failure ({})".format(e)) + return None + + +class ReservationListParser(HTMLParser): + path = [] + + def handle_starttag(self, tag, attrs): + self.path.append(tag) + + c = [x[1] for x in attrs if x[0] == "class"] + if len(c) == 0: + return + + if tag == "td": + print(c) + return + + def handle_data(self, data): + return + + def handle_endtag(self, tag): + self.path.pop()