Source code for finalynx.fetch.source_realt

import json
import re

import requests
from rich.tree import Tree

from ..config import get_active_theme as TH
from ..console import console
from .source_base_line import SourceBaseLine


GNOSIS_API_TOKENLIST_URI = "https://blockscout.com/xdai/mainnet/api?module=account&action=tokenlist&address="
REALT_API_TOKENLIST_URI = "https://api.realt.community/v1/token"


[docs]class SourceRealT(SourceBaseLine): def __init__(self, wallet_address: str, name: str = "RealT", cache_validity: int = 120) -> None: """RealT wrapper to fetch an address' investments. :param wallet_address: Your wallet address. :param name: Set this source's name, can be changed when using multiple RealT sources for multiple RealT addresses. :param cache_validity: Finalynx will save fetched results to a file and reuse them on the next run if the cache age is less than the specified number of hours. """ super().__init__(name, cache_validity) # cache is valid for 5 days self.wallet_address = wallet_address
[docs] def _fetch_data(self, tree: Tree) -> None: """Get investments from RealT and match them with information found on the specified wallet.""" with console.status(f"[bold {TH().ACCENT}]Fetching data from {self.name}...", spinner_style=TH().ACCENT): # Todo optimize API call with API key and/or cached file # Get list of all Realtoken info needed from RealT realt_tokenlist = json.loads(requests.get(REALT_API_TOKENLIST_URI).text) realt_tokeninfo = {} for item in realt_tokenlist: realt_tokeninfo.update( { item["uuid"].lower(): { "fullName": item["fullName"], "shortName": item["shortName"], "tokenPrice": item["tokenPrice"], "uuid": item["uuid"], } } ) # Get list of token own from Gnosis address gnosis_tokenlist = json.loads(requests.get(GNOSIS_API_TOKENLIST_URI + self.wallet_address).text) # Display the lines found to the console, you can create a nested tree if you want node = tree.add("RealT") # Register the real investment information, will be cached and matched to the portfolio for item in gnosis_tokenlist["result"]: address = str(item["contractAddress"]) try: amount = float(item["balance"]) / pow(10, int(item["decimals"])) if re.match(r"^REALTOKEN", str(item["symbol"])): info = realt_tokeninfo[address.lower()] self._register_fetchline( tree_node=node, name=info["shortName"], id=info["uuid"], account=self.name, amount=amount * info["tokenPrice"], currency="$", ) if re.match(r"^armmR", str(item["symbol"])): original_contract_address = json.loads(requests.get(GNOSIS_API_TOKENLIST_URI + address).text) key = str(original_contract_address["result"][0]["contractAddress"]).lower() self._register_fetchline( tree_node=node, name=realt_tokeninfo[key]["shortName"], id=realt_tokeninfo[key]["uuid"], account=self.name, amount=amount * realt_tokeninfo[key]["tokenPrice"], currency="$", ) except Exception as e: self._log(f"[yellow][bold]Warning:[/] failed to parse line '{address}', skipping ({e})")