feat: enhance argument capability

This commit is contained in:
falsycat 2024-08-24 16:50:37 +09:00
parent 06903c89e4
commit 03bb7edac3
5 changed files with 127 additions and 197 deletions

View File

@ -3,73 +3,50 @@ import sqlite3
import sys import sys
from . import args from . import args
from . import calc
from . import db from . import db
from . import parser from . import parser
from . import util
from .ctx import Context
dbconn = None
dbcur = None
ctx = None
try: try:
parg = args.create()
try: # parsing args try: # parsing args
params = args.parse(sys.argv[1:]) params = parg.parse_args(sys.argv[1:])
if params.actions is None:
params.actions = []
except Exception as e: except Exception as e:
args.print_help() parg.print_help()
raise Exception("failure while parsing args", e) raise Exception("failure while parsing args", e)
try: # initializing DB try: # initializing DB
dbconn = sqlite3.connect(params.dbpath) dbconn = sqlite3.connect(params.dbpath)
dbcur = dbconn.cursor() dbcur = dbconn.cursor()
db.try_init(dbcur) db.try_init(dbcur)
dbcur.execute("BEGIN;")
except Exception as e: except Exception as e:
raise Exception("failure while initializing DB", e) raise Exception("failure while initializing DB", e)
if not sys.stdin.isatty(): ctx = Context(dbcur)
try: # insert new records for action in enumerate(params.actions):
for tx in parser.parse(sys.stdin): try:
db.apply(dbcur, tx) action[1](ctx)
except Exception as e: except Exception as e:
raise Exception("failure while inserting new records", e) raise Exception(f"failure while executing {action[0]}-th action", e)
try: # writing outputs
calc.call(
calc.balancesheet,
params.balancesheet,
dbcur,
)
calc.call(
calc.plstatement,
util.get_or(params.plstatement, 0),
dbcur,
begin=util.get_or(params.plstatement, 1),
end =util.get_or(params.plstatement, 2),
)
calc.call(
calc.balancetransition,
params.balancetransition,
dbcur,
)
calc.call(
calc.pltransition,
util.get_or(params.pltransition, 0),
dbcur,
datefmt=util.get_or(params.pltransition, 1),
)
except Exception as e:
raise Exception("failure while calculating output", e)
except Exception as e: except Exception as e:
print(f"bookeeper: error: {e}", file=sys.stderr) print(f"bookeeper: error: {e}", file=sys.stderr)
dbcur and dbcur.execute("ROLLBACK;")
else:
if params.permanentize:
dbcur.execute("COMMIT;")
else:
dbcur.execute("ROLLBACK;")
finally: finally:
if ctx:
ctx.finalize()
if dbcur: if dbcur:
dbcur.close() dbcur.close()
if dbconn: if dbconn:
dbconn.commit(); dbconn.commit();
dbconn.close() dbconn.close()

View File

@ -1,55 +1,68 @@
import argparse import argparse
_parser = argparse.ArgumentParser( def create():
p = argparse.ArgumentParser(
prog="bookeeper", prog="bookeeper",
description="CLI app for bookkeeping", description="CLI app for bookkeeping",
epilog="developed by falsycat <me@falsy.cat>", epilog="developed by falsycat <me@falsy.cat>",
) )
_parser.add_argument( p.add_argument(
"dbpath", "dbpath",
help="path to sqlite database file", help="path to sqlite database file",
nargs="?", nargs="?",
default=":memory:", default=":memory:",
) )
_parser.add_argument( p.add_argument(
"-p", "--permanentize", "-p", "--permanentize",
help="apply changes", help="apply changes",
action="store_true", nargs=1,
dest="permanentize", action=_PermanentizeAction,
) dest="actions",
)
p.add_argument(
"-r", "--read",
help="reads all records from the file",
nargs=1,
action=_ReadRecordsAction,
dest="actions",
)
p.add_argument(
"-o", "--output",
help="changes output file for latter actions",
nargs=1,
action=_ChangeOutputAction,
dest="actions",
)
p.add_argument(
"--sql",
help="executes SQL command",
nargs=1,
action=_SqlAction,
dest="actions",
)
return p
_parser.add_argument( class _PermanentizeAction(argparse.Action):
"-bs", "--balancesheet", def __call__(self, parser, ns, values, option_string=None):
help="emit balancesheet", _push_action(ns, lambda x: x.permanentize())
action="store",
dest="balancesheet",
)
_parser.add_argument(
"-bt", "--balance-transition",
help="emit daily transition of balance",
action="store",
dest="balancetransition",
)
_parser.add_argument(
"-pl", "--pl-statement",
help="emit P/L statement (filepath, begin, end)",
action="store",
nargs="+",
dest="plstatement",
)
_parser.add_argument(
"-plt", "--pl-transition",
help="emit transition of P/L",
action="store",
nargs="+",
dest="pltransition",
)
def print_help(): class _ReadRecordsAction(argparse.Action):
_parser.print_help() def __call__(self, parser, ns, values, option_string=None):
_push_action(ns, lambda x: x.read_records(values[0]))
def parse(args): class _ChangeOutputAction(argparse.Action):
ret = _parser.parse_args(args) def __call__(self, parser, ns, values, option_string=None):
return ret _push_action(ns, lambda x: x.change_output(values[0]))
class _SqlAction(argparse.Action):
def __call__(self, parser, ns, values, option_string=None):
_push_action(ns, lambda x: x.sql(values[0]))
def _push_action(ns, act):
actions = getattr(ns, "actions", None)
if actions is None:
actions = []
setattr(ns, "actions", actions)
actions.append(act)

View File

@ -1,101 +0,0 @@
import sys
def call(func, path, *args, **kwargs):
if path == "-":
func(sys.stdout, *args, **kwargs)
elif path is None:
pass
else:
with open(path, "w") as f:
func(f, *args, **kwargs)
def balancesheet(stream, cur):
cur.execute("""
SELECT
entry.type,
entry.name,
SUM(journal.amount)
FROM journal RIGHT JOIN entry ON journal.entry=entry.id
WHERE entry.type in ("A","D","N")
GROUP BY entry.name;
""")
for row in cur:
print(_recover_entry_name(row[0], row[1]), row[2], file=stream, sep=",")
def plstatement(stream, cur, begin=None, end=None):
if begin is None:
begin = "0000-00-00";
if end is None:
end = "9999-12-31";
cur.execute("""
SELECT
entry.type,
entry.name,
SUM(journal.amount)
FROM journal
RIGHT JOIN entry ON journal.entry=entry.id
RIGHT JOIN tx ON journal.tx =tx.id
WHERE
entry.type in ("E","R") AND tx.date BETWEEN ? AND ?
GROUP BY entry.name;
""", (begin, end))
for row in cur:
print(_recover_entry_name(row[0], row[1]), row[2], file=stream, sep=",")
def balancetransition(stream, cur):
cur.execute("""
SELECT
date,
SUM(sum)
OVER(ORDER BY date ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)
FROM (
SELECT
tx.date AS date,
SUM(
CASE WHEN entry.type="A" THEN journal.amount ELSE -journal.amount END
) AS sum
FROM journal
RIGHT JOIN entry ON journal.entry=entry.id
RIGHT JOIN tx ON journal.tx =tx.id
WHERE entry.type in ("A","D","N")
GROUP BY tx.date
);
""")
for row in cur:
print(*row, file=stream, sep=",")
def pltransition(stream, cur, datefmt=None):
if datefmt is None:
datefmt = "%Y-%m-%d"
cur.execute("""
WITH temp AS (
SELECT
STRFTIME(?, tx.date) AS period,
entry.type AS type,
SUM(amount) AS sum
FROM journal
RIGHT JOIN entry ON journal.entry=entry.id
RIGHT JOIN tx ON journal.tx =tx.id
WHERE entry.type in ("E","R")
GROUP BY period, entry.type
)
SELECT
a.period AS period,
a.sum AS expense,
IFNULL(b.sum, 0) AS revenue
FROM temp AS a LEFT JOIN temp AS b ON a.period=b.period AND b.type="R"
WHERE a.type="E";
""", [datefmt])
for row in cur:
print(*row, file=stream, sep=",")
def _recover_entry_name(t: str, name: str):
m = {
"A": "asset",
"D": "debt",
"N": "net",
"E": "expense",
"R": "revenue",
}
return f"{m[t]}/{name}"

46
bookeeper/ctx.py Normal file
View File

@ -0,0 +1,46 @@
import io
import sqlite3
import sys
from . import db
from . import parser
class Context:
def __init__(self, dbcur: sqlite3.Cursor):
self.ostream = sys.stdout
self.dbcur = dbcur
self.dbcur.execute("BEGIN;")
def finalize(self):
self.dbcur.execute("ROLLBACK;")
self.close_output()
def permanentize(self):
self.dbcur.execute("COMMIT;")
self.dbcur.execute("BEGIN;")
def read_records(self, path):
def parse(st):
for tx in parser.parse(st):
db.apply(self.dbcur, tx)
if path == "-":
parse(sys.stdin)
else:
with open(path) as f:
parse(f)
def change_output(self, path):
self.close_output()
if path != "-":
self.ostream = open(path, "w")
def close_output(self):
if self.ostream is not sys.stdout:
self.ostream.close()
self.ostream = sys.stdout
def sql(self, cmd: str):
self.dbcur.execute(cmd)
for row in self.dbcur:
print(*row, file=self.ostream)

View File

@ -1,5 +0,0 @@
def get_or(a: list, idx: int, default = None):
try:
return a[idx]
except:
return default