python recursive fx cross rates lookup?

inb24sb2  于 2023-07-01  发布在  Python
关注(0)|答案(2)|浏览(142)

给定dict中的利率,例如:

rates = {
        'BTC/USDT' : 30000,         <-- path A, hop 1
        'ETH/USDT' : 1875,
        'BTC/ETH'  : 16,            <-- path B, hop 1
        'DOGE/ETH' : 0.0000342,     <-- path B, hop 2
        'DOGE/USDC' : 0.06267,      <-- path B, hop 3
        'USDT/USDC' : 1.0005        <-- path A, hop 2
    }

我如何计算外汇交叉汇率递归?

def calc_fx(base_ccy: str, quote_ccy: str, rates: Dict[str, float]) -> float:
    if not any([ x for x in rates if x.split('/')[0]==base_ccy or x.split('/')[-1]==base_ccy or x.split('/')[0]==quote_ccy or x.split('/')[-1]==quote_ccy]):
       return None
    if [ x for x in rates if x.split('/')[0]==base_ccy and x.split('/')[-1]==quote_ccy]:
        return rates[f"{base_ccy}/{quote_ccy}"]
    elif [ x for x in rates if x.split('/')[-1]==base_ccy and x.split('/')[0]==quote_ccy]:
        return 1 / rates[f"{quote_ccy}/{base_ccy}"]
    else:
        # How to look up rates recursively?
        return None

base_ccy : str = 'BTC'
quote_ccy : str = 'USDC'
rate = calc_fx(base_ccy, quote_ccy, rates)
print(rate)

例如,BTC/USDC =?,无法从字典中直接查找。你需要通过三个跳跃递归地完成这一点。找到跳数最少的路径?例如,将选择路径A两跳。如果没有找到路径,则返回None。

Path A
    hop 1 BTC/USDT 30000
    hop 2 USDT/USDC 1.0005

    BTC/USDC = 30000 * 1.0005 = 30015

Path B
    hop 1 BTC/ETH = 16
        hop 2 DOGE/ETH = 0.0000342
            hop 3 DOGE/USDC = 0.06267

    BTC/USDC = 16 * (1/0.0000342) * 0.06267 = 29319.30

在上面,将选择路径A。
这个BFS/DFS问题的另一个改进是返回所有可能的路径

我在下面发布了解决方案。

r7s23pms

r7s23pms1#

这就是:

from typing import List, Dict

rates = {
    'BTC/USDT' : 30000, 
    'ETH/USDT' : 1875,
    'BTC/ETH'  : 16,
    'DOGE/ETH' : 0.0000342,
    'DOGE/USDC' : 0.06267,
    'USDT/USDC' : 1.0005
}

class Node():
    def __init__(self, ticker : str, rate : float, parent) -> None:
        self.ticker = ticker
        self.rate = rate
        self.children = []

        if parent:
            self.parent = parent
            parent.children.append(self)
        else:
            self.parent = None
        
def find_root(node):
    target = None
    while node:
        target = node
        node = node.parent
    return target

def trace_to_root(node):
    trace = []
    while node:
        if node.rate!=0:
            trace.append(node)
        node = node.parent
    return trace

def flip(ticker):
    return f"{ticker.split('/')[-1]}/{ticker.split('/')[0]}"

def compute_paths(base_ccy: str, quote_ccy: str, rates: Dict[str, float], parent, solutions : List, visited : List) -> Node:
    ticker = f"{base_ccy}/{quote_ccy}"
    if ticker  in visited or flip(ticker) in visited:
        return
    
    visited.append(f"{base_ccy}/{quote_ccy}")
    visited.append(flip(f"{base_ccy}/{quote_ccy}"))
    
    inscope_rates = [ x for x in rates if x.split('/')[0]==base_ccy or x.split('/')[-1]==base_ccy or x.split('/')[0]==quote_ccy or x.split('/')[-1]==quote_ccy]
    if not any(inscope_rates):
        return
    
    root = find_root(parent) if parent.parent else parent
    
    if [ x for x in inscope_rates if x.split('/')[0]==base_ccy and x.split('/')[-1]==quote_ccy]:
        node = Node(f"{base_ccy}/{quote_ccy}", rates[f"{base_ccy}/{quote_ccy}"], parent)
        is_final = (node.ticker.split('/')[0]==root.ticker.split('/')[-1] or node.ticker.split('/')[-1]==root.ticker.split('/')[-1])
        if is_final:
            solutions.append(trace_to_root(node))

    elif [ x for x in inscope_rates if x.split('/')[-1]==base_ccy and x.split('/')[0]==quote_ccy]:
        node = Node(f"{quote_ccy}/{base_ccy}", 1 / rates[f"{quote_ccy}/{base_ccy}"], parent)
        is_final = (node.ticker.split('/')[0]==root.ticker.split('/')[-1] or node.ticker.split('/')[-1]==root.ticker.split('/')[-1])
        if is_final:
            solutions.append(trace_to_root(node))
    
    else:
        for pair in inscope_rates:
            pair_base, pair_quote = pair.split('/')[0], pair.split('/')[-1]
            if base_ccy==pair_base and pair not in visited:
                node = Node(pair, rates[pair], parent)
                visited.append(node.ticker)
                visited.append(flip(node.ticker))
                is_final = (node.ticker.split('/')[0]==root.ticker.split('/')[-1] or node.ticker.split('/')[-1]==root.ticker.split('/')[-1])
                if not is_final:
                    compute_paths(pair_quote, quote_ccy, rates, node, solutions, visited)
                else:
                    solutions.append(trace_to_root(node))

            elif base_ccy==pair_quote and pair not in visited:
                node = Node(pair, rates[pair], parent)
                visited.append(node.ticker)
                visited.append(flip(node.ticker))
                is_final = (node.ticker.split('/')[0]==root.ticker.split('/')[-1] or node.ticker.split('/')[-1]==root.ticker.split('/')[-1])
                if not is_final:
                    compute_paths(pair_base, quote_ccy, rates, node, solutions, visited)
                else:
                    solutions.append(trace_to_root(node))

def calc_fx(path : List[Node]):
    last_ticker = None
    overall_fx = None
    for node in reversed(path):
        ticker = node.ticker
        fx = node.rate 
        if last_ticker:
            fx = fx if ticker.split('/')[0]==last_ticker.split('/')[0] else 1/fx
        overall_fx = fx if not overall_fx else fx * overall_fx
        last_ticker = ticker
    return overall_fx
    
base_ccy : str = 'BTC' # Try BTC or ETH for multi hops
quote_ccy : str = 'USDC'
root = Node(f"{base_ccy}/{quote_ccy}", 0, None)
visited = []
solutions = []
compute_paths(base_ccy, quote_ccy, rates, root, solutions, visited)

for path in solutions:
    fx = calc_fx(path)
    print(f"fx: {fx}, #hops: {len(path)}")
kgqe7b3p

kgqe7b3p2#

您可以使用igraph来获得最短路径,然后计算货币:

import pandas as pd, igraph as ig, numpy as np
class Convert:
    def __init__(self, rates):
        df = pd.DataFrame(rates, index = [0]).melt()
        d = df.variable.str.split('/', expand = True)
        e = pd.concat([d, d.rename(columns={0:1,1:0})])
        self.rates = rates
        self.f = e.assign(value = np.r_[df.value, 1/df.value])
        self.a = pd.Categorical(e.to_numpy().flatten())
        self.graph = ig.Graph(self.a.codes.reshape(e.shape))

    def calc_fx(self,base_ccy, quote_ccy):
        nodes = np.where(np.in1d(self.a.categories, [base_ccy, quote_ccy]))[0]
        path = self.graph.get_shortest_paths(*nodes)
        vals = self.a.from_codes(path, dtype = self.a.dtype).to_numpy().flatten()
        return self.f.merge(pd.DataFrame(np.c_[vals[:-1], vals[1:]])).value.prod()

rates1 = {
    'BTC/USDT' : 30000,
    'ETH/USDT' : 1875,
    'BTC/ETH'  : 16,    # hop 1
    'DOGE/ETH' : 0.0000342, # hop 2
    'DOGE/USDC' : 0.06267 # hop 3
}
rates2 = {
        'BTC/USDT' : 30000,         #<-- path A, hop 1
        'ETH/USDT' : 1875,
        'BTC/ETH'  : 16,            ##<-- path B, hop 1
        'DOGE/ETH' : 0.0000342,     #<-- path B, hop 2
        'DOGE/USDC' : 0.06267,      #<-- path B, hop 3
        'USDT/USDC' : 1.0005        #<-- path A, hop 2
    }

Convert(rates1).calc_fx('BTC', 'USDC')
Out[308]: 29319.29824561404

Convert(rates2).calc_fx('BTC', 'USDC')
Out[309]: 30015.0

上面的原因是能够重用存储在类中的值:

A = Convert(rates1)
A.calc_fx('BTC', 'USDC')
Out[315]: 29319.29824561404

A.calc_fx('DOGE', 'USDT')
Out[316]: 0.064125

B = Convert(rates2)
B.calc_fx('BTC', 'USDC')
Out[317]: 30015.0

B.calc_fx('DOGE', 'USDT')
Out[318]: 0.064125

相关问题