Source code for payments.coin_handlers.SteemEngine.SteemEngineLoader

"""
**Copyright**::

    +===================================================+
    |                 © 2019 Privex Inc.                |
    |               https://www.privex.io               |
    +===================================================+
    |                                                   |
    |        CryptoToken Converter                      |
    |                                                   |
    |        Core Developer(s):                         |
    |                                                   |
    |          (+)  Chris (@someguy123) [Privex]        |
    |                                                   |
    +===================================================+

"""

import logging
from decimal import Decimal, getcontext, ROUND_DOWN
from time import sleep
from typing import Generator, Iterable, List

from dateutil.parser import parse
from django.core.cache import cache
from privex.steemengine import SETransaction

from payments.coin_handlers.SteemEngine.SteemEngineMixin import SteemEngineMixin
from payments.coin_handlers.base.BaseLoader import BaseLoader
from payments.models import Coin
from privex.helpers import convert_datetime
from steemengine.helpers import empty

log = logging.getLogger(__name__)


[docs]class SteemEngineLoader(BaseLoader, SteemEngineMixin): """ This class handles loading transactions for the **SteemEngine** network, and can support almost any token on SteemEngine. **Copyright**:: +===================================================+ | © 2019 Privex Inc. | | https://www.privex.io | +===================================================+ | | | CryptoToken Converter | | | | Core Developer(s): | | | | (+) Chris (@someguy123) [Privex] | | | +===================================================+ """ provides = [] # type: List[str] """ This attribute is automatically generated by scanning for :class:`models.Coin` s with the type ``steemengine``. This saves us from hard coding specific coin symbols. See __init__.py for populating code. """ def __init__(self, symbols): self._eng_rpc = None self._eng_rpcs = {} super(SteemEngineLoader, self).__init__(symbols=symbols) self.tx_count = 1000 self.loaded = False def _list_txs(self, coin: Coin, batch=100) -> Generator[dict, None, None]: """ Loads transactions for an individual token in batches of `batch`, conforms them to Deposit, then yields each one as a dict """ finished = False offset = txs_loaded = 0 while not finished: self.load_batch(account=coin.our_account, symbol=coin.symbol_id, limit=batch, offset=offset) txs_loaded += len(self.transactions) # If there are less remaining TXs than batch size - this usually means we've hit the end of the results. # If that happens, or we've hit the transaction limit, then yield the remaining txs and exit. if len(self.transactions) < batch or txs_loaded >= self.tx_count: finished = True # Convert the transactions to Deposit format (clean_txs is generator, so must iterate it into list) txs = list(self.clean_txs(account=coin.our_account, symbol=coin.symbol_id, transactions=self.transactions)) del self.transactions # For RAM optimization, destroy the original transaction list, as it's not needed. offset += batch for tx in txs: yield tx del txs # At this point, the current batch is exhausted. Destroy the tx array to save memory.
[docs] def clean_txs(self, account: str, symbol: str, transactions: Iterable[SETransaction]) -> Generator[dict, None, None]: """ Filters a list of transactions by the receiving account, yields dict's conforming with :class:`payments.models.Deposit` :param str account: The 'to' account to filter by :param str symbol: The symbol of the token being filtered :param list<dict> transactions: A list<dict> of transactions to filter :return: A generator yielding ``dict`` s conforming to :class:`payments.models.Deposit` """ for tx in transactions: try: log.debug("Cleaning SENG transaction: %s", tx) if not tx.sender or not tx.to: log.debug("SENG TX missing from/to - skipping") continue if tx.sender.lower() in ['tokens', 'market']: log.debug("SENG TX from tokens/market - skipping") continue # Ignore token issues and market transactions if tx.to.lower() != account.lower(): log.debug("SENG TX is to account '%s' - but we're account '%s' - skipping", tx['to'].lower(), account.lower()) continue # If we aren't the receiver, we don't need it. # Cache the token for 5 mins, so we aren't spamming the token API token = cache.get_or_set('stmeng:'+symbol, lambda: self.get_rpc(symbol).get_token(symbol), 300) q = tx.quantity if type(q) == float: q = ('{0:.' + str(token['precision']) + 'f}').format(tx.quantity) _txid = tx.raw_data.get('txid', tx.raw_data.get('transactionId')) clean_tx = dict( txid=_txid, coin=self.coins[symbol].symbol, tx_timestamp=convert_datetime(tx['timestamp']), from_account=tx.sender, to_account=tx.to, memo=tx.memo, amount=Decimal(q) ) yield clean_tx except: log.exception('Error parsing transaction data. Skipping this TX. tx = %s', tx) continue
[docs] def load_batch(self, account, symbol, limit=100, offset=0, retry=0): """Load SteemEngine transactions for account/symbol into self.transactions with automatic retry on error""" try: self.transactions = self.get_rpc(symbol).list_transactions(account, symbol, limit=limit, offset=offset) except: log.exception('Something went wrong while loading transactions for symbol %s account %s', account, symbol) if retry >= 3: log.error('Could not load TX data after 3 tries. Aborting.') raise Exception('Failed to load TX data for {}:{} after 3 tries.'.format(account, symbol)) log.error('Will try again in a few seconds.') sleep(3) self.load_batch(account, symbol, limit, offset, retry=retry+1)
[docs] def list_txs(self, batch=100) -> Generator[dict, None, None]: """ Get transactions for all coins in `self.coins` where the 'to' field matches coin.our_account If :meth:`.load()` hasn't been ran already, it will automatically call self.load() :param batch: Amount of transactions to load per batch :return: Generator yielding dict's that conform to :class:`models.Deposit` """ if not self.loaded: self.load() for symbol, c in self.coins.items(): try: for tx in self._list_txs(coin=c, batch=batch): yield tx except: log.exception('Something went wrong while loading transactions for coin %s. Skipping for now.', c) continue
[docs] def load(self, tx_count=1000): log.info('Loading Steem Engine transactions...') # with transaction.atomic(): self.tx_count = tx_count for symbol, coin in self.coins.items(): if empty(coin.our_account): log.warning('The coin %s does not have `our_account` set. Refusing to load transactions.', coin) del self.coins[symbol] self.symbols = [s for s in self.symbols if s != symbol] self.loaded = True