Source code for finalynx.budget.budget

from datetime import datetime
from pathlib import Path
from typing import List
from typing import Optional
from typing import TYPE_CHECKING
from typing import Union

import gspread
from rich.prompt import Confirm
from rich.table import Table
from rich.tree import Tree

from ..config import get_active_theme as TH
from ..console import console
from .expense import Constraint
from .expense import Expense
from .expense import Period
from .expense import Status
from .source_n26 import SourceN26

# noreorder
from ._render import _render_expenses_table
from ._review import _i_paid, _payback, _constraint, _period, _comment, _status  # noqa: F401

if TYPE_CHECKING:
    from gspread.worksheet import Worksheet


[docs]class Budget: MAX_DISPLAY_ROWS = 10 def __init__(self, service_account_path: Union[str, Path, None] = None) -> None: # Google Sheet that serves as the database of expenses, will be connected later self._sheet: Optional[Worksheet] = None # Initialize the list of expenses, will be fetched later self.expenses: List[Expense] = [] self.n_new_expenses: int = 0 self.balance: float = 0.0 # Private copy that only includes expenses that need user review (calculated only once) self._pending_expenses: List[Expense] = [] # Path to the Google Sheets token file, defaults to the OS's default directory self._gspread_token_path = ( Path(service_account_path) if service_account_path else gspread.auth.DEFAULT_SERVICE_ACCOUNT_FILENAME )
[docs] def set_gspread_token_path(self, path: Union[str, Path]) -> None: """Set the path to the Google Sheets token file, defaults to the OS's default directory.""" self._gspread_token_path = Path(path)
[docs] def fetch(self, clear_cache: bool, force_signin: bool = False) -> Tree: """Get expenses from all sources and return a rich tree to summarize the results. This method also updates the google sheets table with the newly found expenses and prepares the list of "pending" expenses that need user reviews.""" # Connect to the Google Sheet that serves as the database of expenses with console.status(f"[bold {TH().ACCENT}]Connecting to Google Sheets...", spinner_style=TH().ACCENT): try: gs = gspread.service_account(filename=self._gspread_token_path) sh = gs.open("Finalynx Expenses") self._sheet = sh.worksheet("Sheet1") except Exception as e: # noqa console.log( "[red][bold]Error:[/] Couldn't connect to GSheets, have you placed your " "personal service_account.json token file in your OS's default directory?" ) # Make sure we are already connected to the source and the sheet assert self._sheet is not None, "Something went wrong with the connection to GSheets" # Fetch the latest values from the the sheet sheet_values = self._sheet.get_all_values() # Initialize the N26 client with the credentials if Confirm.ask("Fetch expenses from N26?", default=True): source = SourceN26(force_signin) tree = source.fetch(clear_cache=bool(clear_cache or force_signin)) self.balance = source.balance # Get the new expenses from the source that are not in the sheet yet last_timestamp = max([int(row[0]) for row in sheet_values if str(row[0]).isdigit()]) new_expenses = list(reversed([e for e in source.get_expenses() if e.timestamp > last_timestamp])) self.n_new_expenses = len(new_expenses) # Add the new expenses to the sheet if self.n_new_expenses > 0: first_empty_row = len(sheet_values) + 1 self._sheet.update( f"A{first_empty_row}:D{first_empty_row + len(new_expenses)}", [d.to_list()[:4] for d in new_expenses], ) # Fetch the latest values from the sheet again to get the complete list of expenses sheet_values = self._fetch_sheet_values() # TODO improve else: tree = Tree("N26 Skipped.") # From now on, we will work with the up-to-date list of expenses self.expenses = [Expense.from_list(line, i + 2) for i, line in enumerate(sheet_values[1:])] # Filter expenses to keep only the ones that are not skipped and incomplete self._pending_expenses = [ t for t in self.expenses if t.status not in [Status.SKIP, Status.DONE, Status.TODO] or t.i_paid is None ] # Return the tree summary to be displayed in the console return tree
[docs] def render_expenses(self) -> Union[Table, str]: # Make sure we are already connected to the source and the sheet assert self._pending_expenses is not None, "Call `fetch()` first" # Display the table of pending expenses n_pending = len(self._pending_expenses) if n_pending == 0: return "[green]No pending expenses 🎉" + ( f" [dim white]N26 Balance: {self.balance:.2f} €\n" if self.balance > 0.001 else "\n" ) return _render_expenses_table( self._pending_expenses[-Budget.MAX_DISPLAY_ROWS :], # noqa: E203 title=( f"{self.n_new_expenses} new expense{'s' if self.n_new_expenses != 1 else ''} ── {n_pending} need{'s' if n_pending == 1 else ''} review " + (f"(displaying {Budget.MAX_DISPLAY_ROWS} first rows)" if n_pending > Budget.MAX_DISPLAY_ROWS else "") ), caption=f"N26 Balance: {self.balance:.2f} €", )
[docs] def render_summary(self) -> Tree: """Render a summary of the budget, mainly the current and previous month's totals.""" # Make sure we are already connected to the source and the sheet assert self.expenses is not None, "Call `fetch()` first" tree = Tree("Budget", hide_root=True, guide_style=TH().HINT) def _get_monthly_expenses(month: int, year: int) -> List[Expense]: return [ e for e in self.expenses if e.as_datetime().month == month and e.as_datetime().year == year and e.status != Status.SKIP and e.period == Period.MONTHLY ] def _add_node(title: str, total: float, hint: str = "") -> Tree: return tree.add(f"[bold {TH().TEXT}]{title:<11}[/] [{TH().TEXT}]{str(total):>6} € [{TH().ACCENT}]{hint}[/]") # Get the yearly total now = datetime.now() yearly_total = sum( [ (e.i_paid if e.i_paid is not None else 0) for e in self.expenses if e.as_datetime().year == now.year and e.status != Status.SKIP and e.period == Period.YEARLY ] ) _add_node( str(now.year), round(yearly_total), hint=f" {round((yearly_total / 12)):>5} € / month", ) node = tree.add(" ") # Get each month's total expenses month_totals: List[int] = [] for i_month in range(1, now.month + 1): expenses = _get_monthly_expenses(i_month, now.year) monthly_total = round(sum([(e.i_paid if e.i_paid is not None else 0) for e in expenses])) month_totals.append(round(monthly_total + (yearly_total / 12))) node = _add_node( datetime(now.year, i_month, 1).strftime("%B"), monthly_total, f" {month_totals[-1]:>5} €", ) # Summarize the expenses by category for the last 3 months if i_month > now.month - 3: for c in [c for c in Constraint if c != Constraint.UNKNOWN]: node.add( f"[{TH().HINT}]{c.value.capitalize():<8} " f"{round(sum([(e.i_paid if e.i_paid is not None else 0) for e in expenses if e.constraint == c])):>5} €" ) tree.add(" ") mean_monthly_total = round(sum(month_totals[:-1]) / len(month_totals[:-1])) last_month_name = datetime(now.year, now.month - 1, 1).strftime("%B") delta = month_totals[-1] - mean_monthly_total tree.add(f"[{TH().TEXT}]Current delta [{'green' if delta > 0 else 'red'}][bold]{delta:>12} €[/]\n") tree.add( f"[{TH().TEXT}]Mean up to {last_month_name:<9} [{TH().ACCENT}][bold]{mean_monthly_total:>5} €[/] / month" ) return tree
[docs] def interactive_review(self) -> None: """Review the list of pending expenses one by one, and update the sheet with the new values. This method is interactive, and will clear the console between each expense or when the user presses Ctrl+C.""" assert self._sheet is not None, "Call fetch() first" assert self._pending_expenses is not None, "Call `fetch()` first" if not self._pending_expenses: console.print("[green]You're all done with your expenses! 💸") return # Make space so that the main table is not hidden by the next console clears console.print("\n" * console.height) console.clear() # Review pending expenses in reverse (most recent first) for convenience review_expenses = self._pending_expenses[::-1] # For each expense, ask the user to set each field to classify the expense try: for i, t in enumerate(review_expenses): _skip_line = False for method in [ _i_paid, _payback, _constraint, _period, _comment, _status, ]: result = method(review_expenses, i) if result is None: console.clear() return elif result is False: _skip_line = True break if not _skip_line: with console.status(f"[bold {TH().ACCENT}]Saving...", spinner_style=TH().ACCENT): self._sheet.update(f"A{t.cell_number}:J{t.cell_number}", [t.to_list()]) console.clear() console.print("[bold]All done![/] 🎉") except KeyboardInterrupt: console.clear()
[docs] def _fetch_sheet_values(self) -> List[List[str]]: """Get the latest values from the Google Sheet.""" assert self._sheet is not None, "Call connect() first" with console.status( f"[bold {TH().ACCENT}]Fetching previous expenses from Google Sheets...", spinner_style=TH().ACCENT, ): return self._sheet.get_all_values() # type: ignore