implement new mod to make a reserve on e-license

This commit is contained in:
falsycat 2023-02-11 23:55:39 +09:00
parent 6bde533161
commit 5c866c3b71
3 changed files with 141 additions and 36 deletions

1
.gitignore vendored
View File

@ -1,2 +1,3 @@
__pycache__/
/config.json
/auth.json

95
main.py
View File

@ -1,3 +1,4 @@
import logging
import json
import os
import time
@ -6,11 +7,14 @@ from datetime import datetime, timezone
from html.parser import HTMLParser
from mastodon import Mastodon
import mods.elicense
class Handler:
def __init__(self, app, st):
self.app = app
self.st = st
self.logging = logging.getLogger("app.handler")
self.app = app
self.st = st
self.text = sanitize_html(st["content"])
self.tokens = self.text.split()[1:]
@ -18,7 +22,6 @@ class Handler:
self.handle()
def handle(self):
print(self.tokens)
if self.tokens[0] == "watch":
self.watch()
else:
@ -32,45 +35,24 @@ class Handler:
if elapsed.seconds < self.app.CONFIG["interval"]:
self.app.reply(self.st, "accepted")
print("watching", date, slots)
mod = self.app.mods["elicense"]
mod.watch(self.st["id"], date, slots)
class App:
def __init__(self):
logging.basicConfig()
self.logging = logging.getLogger("app")
self.logging.setLevel(logging.DEBUG)
with open("config.json", "r") as f:
self.CONFIG = json.load(f)
self.logging.info("configuration loaded")
self.M = self.authenticate()
self.run()
self.logging.info("authentication succeeded")
def reply(self, st, msg):
try:
print(msg)
#self.M.status_post(
# status = "@{} {}".format(st["account"]["acct"], msg),
# in_reply_to_id = st["id"])
except Exception as e:
print("reply failure:", msg, "(", e, ")")
def run(self):
while True:
try:
items = self.M.notifications(
account_id = self.CONFIG["master"],
types = "mention")
for item in items:
st = item["status"]
# handle the item
try:
Handler(self, st)
except Exception as e:
self.reply(st, "handling aborted: {}".format(e))
self.M.notifications_dismiss(id = item["id"])
print("cycle done")
except Exception as e:
print("cycle aborted:", e)
time.sleep(self.CONFIG["interval"])
return
self.mods = {}
def authenticate(self):
if os.path.exists("auth.json"):
@ -100,9 +82,50 @@ class App:
dic = {"client_id": cid, "secret": secret, "token": tok}
with open("auth.json", "w") as f:
json.dump(dic, f, indent = 2)
return M
def install(self, name, mod):
self.logging.info("mod installed, "+name)
logger = logging.getLogger("mod."+name)
self.mods[name] = mod.init(self.CONFIG["mods"][name], logger)
def run(self):
self.install("elicense", mods.elicense)
while True:
try:
self.cycle()
self.logging.debug("cycle done")
except Exception as e:
self.logging.warning("cycle aborted:", e)
for mod in self.mods:
self.mods[mod].cycle()
time.sleep(self.CONFIG["interval"])
def cycle(self):
items = self.M.notifications(
account_id = self.CONFIG["master"],
types = "mention")
for item in items:
st = item["status"]
try:
Handler(self, st)
except Exception as e:
self.reply(st, "handling aborted: {}".format(e))
self.M.notifications_dismiss(id = item["id"])
# TODO: call mod cycles
def reply(self, st, msg):
try:
self.logging.debug("reply:", msg)
#self.M.status_post(
# status = "@{} {}".format(st["account"]["acct"], msg),
# in_reply_to_id = st["id"])
except Exception as e:
self.logging.error("reply failure:", msg, "(", e, ")")
def sanitize_html(html):
class F(HTMLParser):
@ -115,4 +138,4 @@ def sanitize_html(html):
f.feed(html)
return f.text
App()
App().run()

81
mods/elicense.py Normal file
View File

@ -0,0 +1,81 @@
import logging
import json
import requests
from html.parser import HTMLParser
def init(CONFIG, logger):
logger.setLevel(logging.DEBUG)
return Mod(CONFIG, logger)
class Mod:
def __init__(self, CONFIG, logging):
self.URL = "https://www.e-license.jp/el25/pc/"
self.CONFIG = CONFIG
self.logging = logging
self.targets = []
self.ss = self.authenticate()
def cycle(self):
res = self.ss.post(
url = self.URL+"p04a.action",
data = {
"b.processCd": "A",
"b.kamokuCd" : 0,
"b.page" : 1,
"b.schoolCd" : self.CONFIG["school"],
})
res.encoding = "shift_jis"
parser = ReservationListParser()
parser.feed(res.text)
return
def watch(self, date, slots):
self.logging.debug("watch request accepted: {} {}".format(+date+slots))
self.targets.append((date, slots))
def authenticate(self):
ss = requests.Session()
try:
res = ss.post(
url = self.URL+"p01a.action",
data = {
"b.studentId" : self.CONFIG["username"],
"b.password" : self.CONFIG["password"],
"b.schoolCd" : self.CONFIG["school"],
"b.processCd" : "",
"b.kamokuCd" : "",
"method:doLogin" : "ログイン".encode("shift_jis"),
})
self.logging.info("authenticated")
return ss
except Exception as e:
self.logging.error("authentication failure ({})".format(e))
return None
class ReservationListParser(HTMLParser):
path = []
def handle_starttag(self, tag, attrs):
self.path.append(tag)
c = [x[1] for x in attrs if x[0] == "class"]
if len(c) == 0:
return
if tag == "td":
print(c)
return
def handle_data(self, data):
return
def handle_endtag(self, tag):
self.path.pop()