creates new project

This commit is contained in:
falsycat 2024-08-17 12:39:08 +09:00
parent 9542f9b26d
commit 06903c89e4
7 changed files with 366 additions and 0 deletions

1
.gitignore vendored Normal file
View File

@ -0,0 +1 @@
__pycache__/

75
bookeeper/__main__.py Normal file
View File

@ -0,0 +1,75 @@
import os
import sqlite3
import sys
from . import args
from . import calc
from . import db
from . import parser
from . import util
try:
try: # parsing args
params = args.parse(sys.argv[1:])
except Exception as e:
args.print_help()
raise Exception("failure while parsing args", e)
try: # initializing DB
dbconn = sqlite3.connect(params.dbpath)
dbcur = dbconn.cursor()
db.try_init(dbcur)
dbcur.execute("BEGIN;")
except Exception as e:
raise Exception("failure while initializing DB", e)
if not sys.stdin.isatty():
try: # insert new records
for tx in parser.parse(sys.stdin):
db.apply(dbcur, tx)
except Exception as e:
raise Exception("failure while inserting new records", e)
try: # writing outputs
calc.call(
calc.balancesheet,
params.balancesheet,
dbcur,
)
calc.call(
calc.plstatement,
util.get_or(params.plstatement, 0),
dbcur,
begin=util.get_or(params.plstatement, 1),
end =util.get_or(params.plstatement, 2),
)
calc.call(
calc.balancetransition,
params.balancetransition,
dbcur,
)
calc.call(
calc.pltransition,
util.get_or(params.pltransition, 0),
dbcur,
datefmt=util.get_or(params.pltransition, 1),
)
except Exception as e:
raise Exception("failure while calculating output", e)
except Exception as e:
print(f"bookeeper: error: {e}", file=sys.stderr)
dbcur and dbcur.execute("ROLLBACK;")
else:
if params.permanentize:
dbcur.execute("COMMIT;")
else:
dbcur.execute("ROLLBACK;")
finally:
if dbcur:
dbcur.close()
if dbconn:
dbconn.commit();
dbconn.close()

55
bookeeper/args.py Normal file
View File

@ -0,0 +1,55 @@
import argparse
_parser = argparse.ArgumentParser(
prog="bookeeper",
description="CLI app for bookkeeping",
epilog="developed by falsycat <me@falsy.cat>",
)
_parser.add_argument(
"dbpath",
help="path to sqlite database file",
nargs="?",
default=":memory:",
)
_parser.add_argument(
"-p", "--permanentize",
help="apply changes",
action="store_true",
dest="permanentize",
)
_parser.add_argument(
"-bs", "--balancesheet",
help="emit balancesheet",
action="store",
dest="balancesheet",
)
_parser.add_argument(
"-bt", "--balance-transition",
help="emit daily transition of balance",
action="store",
dest="balancetransition",
)
_parser.add_argument(
"-pl", "--pl-statement",
help="emit P/L statement (filepath, begin, end)",
action="store",
nargs="+",
dest="plstatement",
)
_parser.add_argument(
"-plt", "--pl-transition",
help="emit transition of P/L",
action="store",
nargs="+",
dest="pltransition",
)
def print_help():
_parser.print_help()
def parse(args):
ret = _parser.parse_args(args)
return ret

101
bookeeper/calc.py Normal file
View File

@ -0,0 +1,101 @@
import sys
def call(func, path, *args, **kwargs):
if path == "-":
func(sys.stdout, *args, **kwargs)
elif path is None:
pass
else:
with open(path, "w") as f:
func(f, *args, **kwargs)
def balancesheet(stream, cur):
cur.execute("""
SELECT
entry.type,
entry.name,
SUM(journal.amount)
FROM journal RIGHT JOIN entry ON journal.entry=entry.id
WHERE entry.type in ("A","D","N")
GROUP BY entry.name;
""")
for row in cur:
print(_recover_entry_name(row[0], row[1]), row[2], file=stream, sep=",")
def plstatement(stream, cur, begin=None, end=None):
if begin is None:
begin = "0000-00-00";
if end is None:
end = "9999-12-31";
cur.execute("""
SELECT
entry.type,
entry.name,
SUM(journal.amount)
FROM journal
RIGHT JOIN entry ON journal.entry=entry.id
RIGHT JOIN tx ON journal.tx =tx.id
WHERE
entry.type in ("E","R") AND tx.date BETWEEN ? AND ?
GROUP BY entry.name;
""", (begin, end))
for row in cur:
print(_recover_entry_name(row[0], row[1]), row[2], file=stream, sep=",")
def balancetransition(stream, cur):
cur.execute("""
SELECT
date,
SUM(sum)
OVER(ORDER BY date ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)
FROM (
SELECT
tx.date AS date,
SUM(
CASE WHEN entry.type="A" THEN journal.amount ELSE -journal.amount END
) AS sum
FROM journal
RIGHT JOIN entry ON journal.entry=entry.id
RIGHT JOIN tx ON journal.tx =tx.id
WHERE entry.type in ("A","D","N")
GROUP BY tx.date
);
""")
for row in cur:
print(*row, file=stream, sep=",")
def pltransition(stream, cur, datefmt=None):
if datefmt is None:
datefmt = "%Y-%m-%d"
cur.execute("""
WITH temp AS (
SELECT
STRFTIME(?, tx.date) AS period,
entry.type AS type,
SUM(amount) AS sum
FROM journal
RIGHT JOIN entry ON journal.entry=entry.id
RIGHT JOIN tx ON journal.tx =tx.id
WHERE entry.type in ("E","R")
GROUP BY period, entry.type
)
SELECT
a.period AS period,
a.sum AS expense,
IFNULL(b.sum, 0) AS revenue
FROM temp AS a LEFT JOIN temp AS b ON a.period=b.period AND b.type="R"
WHERE a.type="E";
""", [datefmt])
for row in cur:
print(*row, file=stream, sep=",")
def _recover_entry_name(t: str, name: str):
m = {
"A": "asset",
"D": "debt",
"N": "net",
"E": "expense",
"R": "revenue",
}
return f"{m[t]}/{name}"

49
bookeeper/db.py Normal file
View File

@ -0,0 +1,49 @@
import sqlite3
def try_init(cur):
cur.execute("""
CREATE TABLE IF NOT EXISTS entry (
id INTEGER PRIMARY KEY AUTOINCREMENT,
type VARCHAR(1) NOT NULL CHECK(type IN ("A","D","N","E","R")),
name TEXT NOT NULL,
UNIQUE (type, name)
);
""")
cur.execute("""
CREATE TABLE IF NOT EXISTS tx (
id INTEGER PRIMARY KEY AUTOINCREMENT,
summary TEXT,
date DATE NOT NULL,
time TIME
);
""")
cur.execute("""
CREATE TABLE IF NOT EXISTS journal (
id INTEGER PRIMARY KEY AUTOINCREMENT,
tx INTEGER NOT NULL,
entry INTEGER NOT NULL,
amount INTEGER NOT NULL,
FOREIGN KEY(tx) REFERENCES tx(id),
FOREIGN KEY(entry) REFERENCES entry(name)
);
""")
def apply(cur, tx):
cur.execute("""
INSERT INTO tx(summary, date, time) VALUES (?, ?, ?) RETURNING id
""", (tx.summary, tx.date, tx.time))
txid = next(cur)[0]
for j in tx.journals:
cur.execute("""
INSERT INTO entry(type, name) VALUES (?, ?)
ON CONFLICT DO UPDATE SET id=id
RETURNING id
""", j[0:2])
eid = next(cur)[0]
cur.execute("""
INSERT INTO journal(tx, entry, amount) VALUES (?, ?, ?)
""", (txid, eid, j[2]))

80
bookeeper/parser.py Normal file
View File

@ -0,0 +1,80 @@
import re
import string
from dataclasses import dataclass, field
_RE_DATETIME = re.compile(r"(\d{4}-\d{2}-\d{2})(?:(@\d{2}:\d{2}))?")
@dataclass
class Tx:
date : str = ""
time : str|None = None
summary : str = ""
journals: list[tuple[str, int]] = field(default_factory=lambda: [])
def parse(stream):
parts = []
end = False
line_num = 0
while not end:
line_num += 1
line = stream.readline()
end = ("" == line)
if end or not line[0].isspace():
if tx := _make_tx(parts):
yield tx
parts = []
comment = line.find("#")
if comment >= 0:
line = line[0:comment]
line = line.strip()
if line != "":
parts.append((line_num, line))
def _make_tx(lines: list[tuple[int, str]]) -> dict:
if len(lines) < 2:
return None
tx = Tx()
line1 = lines[0][1].split(maxsplit=1)
if m := re.fullmatch(_RE_DATETIME, line1[0]):
tx.date = m.group(1)
tx.time = m.group(2)
else:
raise Exception(f"invalid datetime format: '{line1[0]}' ({lines[0][1]})")
if len(line1) == 2:
tx.summary = line1[1]
for line in lines[1:]:
tokens = line[1].split()
if len(tokens) != 2:
raise Exception("invalid journal: '{line[1]}' (line[0])")
t, name = _parse_entry_name(tokens[0])
tx.journals.append((t, name, int(tokens[1])))
return tx
def _parse_entry_name(name: str) -> tuple[str, str]:
tokens = name.split("/", maxsplit=1)
if len(tokens) != 2:
raise Exception(f"invalid entry name: {name}")
if "asset" == tokens[0]:
t = "A"
elif "debt" == tokens[0]:
t = "D"
elif "net" == tokens[0]:
t = "N"
elif "expense" == tokens[0]:
t = "E"
elif "revenue" == tokens[0]:
t = "R"
else:
raise Exception(f"invalid entry type: {tokens[0]}")
return (t, tokens[1])

5
bookeeper/util.py Normal file
View File

@ -0,0 +1,5 @@
def get_or(a: list, idx: int, default = None):
try:
return a[idx]
except:
return default