import json import os import time from datetime import datetime, timezone from html.parser import HTMLParser from mastodon import Mastodon class Handler: def __init__(self, app, st): self.app = app self.st = st self.text = sanitize_html(st["content"]) self.tokens = self.text.split()[1:] self.handle() def handle(self): print(self.tokens) if self.tokens[0] == "watch": self.watch() else: raise Exception("unknown command") def watch(self): date = self.tokens[1] slots = [int(x) for x in self.tokens[2:]] elapsed = datetime.now(timezone.utc) - self.st["created_at"] if elapsed.seconds < self.app.CONFIG["interval"]: self.app.reply(self.st, "accepted") print("watching", date, slots) class App: def __init__(self): with open("config.json", "r") as f: self.CONFIG = json.load(f) self.M = self.authenticate() self.run() def reply(self, st, msg): try: print(msg) #self.M.status_post( # status = "@{} {}".format(st["account"]["acct"], msg), # in_reply_to_id = st["id"]) except Exception as e: print("reply failure:", msg, "(", e, ")") def run(self): while True: try: items = self.M.notifications( account_id = self.CONFIG["master"], types = "mention") for item in items: st = item["status"] # handle the item try: Handler(self, st) except Exception as e: self.reply(st, "handling aborted: {}".format(e)) self.M.notifications_dismiss(id = item["id"]) print("cycle done") except Exception as e: print("cycle aborted:", e) time.sleep(self.CONFIG["interval"]) return def authenticate(self): if os.path.exists("auth.json"): with open("auth.json", "r") as f: auth = json.load(f) M = Mastodon( api_base_url = self.CONFIG["instance"], client_id = auth["client_id"], client_secret = auth["secret"], access_token = auth["token"]) else: cid, secret = Mastodon.create_app( client_name = "maidbot", api_base_url = self.CONFIG["instance"]) M = Mastodon( client_id = cid, client_secret = secret, api_base_url = self.CONFIG["instance"]) tok = M.log_in( username = self.CONFIG["username"], password = self.CONFIG["password"]) dic = {"client_id": cid, "secret": secret, "token": tok} with open("auth.json", "w") as f: json.dump(dic, f, indent = 2) return M def sanitize_html(html): class F(HTMLParser): text = "" def handle_starttag(self, tag, attrs): if tag == "br": self.text += " " def handle_data(self, data): self.text += data f = F() f.feed(html) return f.text App()