Source code for payments.coin_handlers.Hive.HiveLoader

from typing import Any, Dict, List, Generator
import logging
from decimal import Decimal, getcontext, ROUND_DOWN
from typing import Dict, List, Iterable, Generator, Union

import pytz
from beem.asset import Asset
from dateutil.parser import parse
from django.utils import timezone

from payments.coin_handlers import BaseLoader
from beem.account import Account

from payments.coin_handlers.Hive.HiveMixin import HiveMixin
from steemengine.helpers import empty

log = logging.getLogger(__name__)
getcontext().rounding = ROUND_DOWN


[docs]class HiveLoader(BaseLoader, HiveMixin): provides = ["HIVE", "HBD"] # type: List[str] """ This attribute is automatically generated by scanning for :class:`models.Coin` s with the type ``steembase``. This saves us from hard coding specific coin symbols. See __init__.py for populating code. """ def __init__(self, symbols): super(HiveLoader, self).__init__(symbols=symbols) self.tx_count = 100 self.loaded = False self._rpc = None self._rpcs = {}
[docs] def list_txs(self, batch=0) -> Generator[dict, None, None]: if not self.loaded: self.load() for symbol, c in self.coins.items(): acc_name = c.our_account acc = Account(acc_name, steem_instance=self.get_rpc(c.symbol_id)) # get_account_history returns a generator with automatic batching, so we don't have to worry about batches. txs = acc.get_account_history(-1, self.tx_count, only_ops=['transfer']) yield from self.clean_txs(symbol=c.symbol_id, transactions=txs, account=acc_name)
@property def settings(self) -> Dict[str, dict]: """To ensure we always get fresh settings from the DB after a reload""" return dict(((sym, c.settings) for sym, c in self.coins.items()))
[docs] def load(self, tx_count=10000): # Unlike other coins, it's important to load a lot of TXs, because many won't actually be transfers # Thus the default TX count for Steem is 10,000 self.tx_count = tx_count for symbol, coin in self.coins.items(): if not empty(coin.our_account): continue log.warning('The coin %s does not have `our_account` set. Refusing to load transactions.', coin) del self.coins[symbol] self.symbols = [s for s in self.symbols if s != symbol]
[docs] def clean_txs(self, symbol: str, transactions: Iterable[dict], account: str = None) -> Generator[dict, None, None]: """ Filters a list of transactions `transactions` as required, yields dict's conforming with :class:`models.Deposit` - Filters out transactions that are not marked as 'receive' - Filters out mining transactions - Filters by address if `account` is specified - Filters out transactions that don't have enough confirms, and are not reported as 'trusted' :param symbol: Symbol of coin being cleaned :param transactions: A ``list<dict>`` or generator producing dict's :param account: If not None, only return TXs sent to this address. :return Generator<dict>: A generator outputting dictionaries formatted as below Output Format:: { txid:str, coin:str (symbol), vout:int, tx_timestamp:datetime, address:str, amount:Decimal } """ log.debug('Filtering transactions for %s', symbol) for tx in transactions: try: t = self.clean_tx(tx, symbol, account) if t is None: continue # Re-write the coin symbol into the database symbol t['coin'] = self.coins[symbol].symbol yield t except (AttributeError, KeyError) as e: log.warning('Steem TX missing important key? %s', str(e)) except: log.exception('Error filtering Steem TX, skipping... TX data: %s', tx)
[docs] def clean_tx(self, tx: dict, symbol: str, account: str, memo: str = None, memo_case: bool = False) -> Union[dict, None]: """Filters an individual transaction. See :meth:`.clean_txs` for info""" # log.debug(tx) if tx.get('type', 'NOT SET') != 'transfer': log.debug('Steem TX is not transfer. Type is: %s', tx.get('type', 'NOT SET')) return None txid = tx.get('trx_id', None) _am = tx['amount'] # Transfer ops contain a dict 'amount', containing amount:int, nai:str, precision:int if type(_am) is str: # Extract and validate asset 'ABC' from '12.345 ABC' amt, _symbol = _am.split() _asset = Asset(symbol, steem_instance=self.get_rpc(symbol)) else: # Conv asset ID (e.g. @@000000021) to symbol, i.e. "STEEM" _asset = Asset(_am['nai'], steem_instance=self.get_rpc(symbol)) # Convert integer amount/precision to Decimal's, preventing floating point issues amt_int = Decimal(_am['amount']) amt_prec = Decimal(_am['precision']) amt = amt_int / (Decimal(10) ** amt_prec) # Use precision value to convert from integer amt to decimal amt # Get validated symbol from beem Asset amt_sym = str(_asset.symbol) if amt_sym != symbol: # If the symbol doesn't match the symbol we were passed, skip this TX return None tx_memo = tx.get('memo') log.debug('Filtering/cleaning steem transaction, Amt: %f, TXID: %s', amt, txid) if tx['to'] != account or tx['from'] == account: return None # If the transaction isn't to us (account), or it's from ourselves, ignore it. if not empty(memo) and (tx_memo != memo or (not memo_case and tx_memo.lower() != memo.lower())): return None d = parse(tx['timestamp']) d = timezone.make_aware(d, pytz.UTC) return dict( txid=txid, coin=symbol, vout=int(tx.get('op_in_trx', 0)), tx_timestamp=d, from_account=tx.get('from', None), to_account=tx.get('to', None), memo=tx_memo, amount=Decimal(amt) )