python 为什么我的WebSocket协程没有在下面的代码中被调用?

zaqlnxep  于 2023-04-28  发布在  Python
关注(0)|答案(1)|浏览(141)

我正在使用Alpaca-py,Alpaca的新Python包,来创建一个基本的tradebot。我的目标是让机器人进行交易(买入),从Alpaca的webhook中获取有关订单是否已完成的数据(以及其他一些信息),然后用相同的股票进行另一笔交易(卖出)。在尝试整合webhook之前,这个机器人买卖得很好。然而,我似乎无法启动并运行协程。
我尝试了以下方法:
1.将await语句移到协程中的不同区域。
1.更改方法的位置并从各种方法中删除async。
1.查看羊驼的文档。(不幸的是,alpaca-py在2023年推出,他们的很多文档都已经过时了)
1.阅读TradingStream代码以确保我做的一切都是正确的。一切正常
1.更改asyncio。收集呼叫并将它们作为例程运行。我得到了同样的结果。
1.在代码中添加logger语句。这告诉我,我的方法'trade_update_handler'没有被调用,因为没有任何东西被打印到控制台。
1.使用'run()'而不是'_run_forever()',但这会导致webhook端的错误。
我使用Django来运行机器人,因为我喜欢它的BaseCommand类。我不认为django和这个问题有任何关系。下面是我的代码:

class TradeMaker():
    def __init__(self, **kwargs):
        self.paper_bool = kwargs.get('paper_bool', True)
        self.random_bool = kwargs.get('random', True)
        self.symbol_or_symbols = kwargs.get('symbol_or_symbols', 'AAPL')
        self.amount = kwargs.get('amount', 40000)
        self.seconds_between = kwargs.get('seconds_between', 4)
        self.log = kwargs.get('log')
        self.trading_client, self.trading_stream, self.account = self.open_client()
        self.trade_update_info = None
        self.order_filled = False
        self.shares_bought = 0
        self.current_symbol = None
    
    def open_client(self):
        trading_client = TradingClient(ALPACA_ID, ALPACA_KEY, paper=self.paper_bool)
        trading_stream = TradingStream(ALPACA_ID, ALPACA_KEY, paper=self.paper_bool)
        try:
            account = trading_client.get_account()
        except Exception as e:
            logger.error(f"Exception in login: {e}")
        return trading_client, trading_stream, account
    
    async def trade_update_handler(self, data):
        logger.info('Trade Update called')
        print("Trade Update:", data)
        if data.event == TradeEvent.FILL:
            if data.order.side == OrderSide.BUY:
                self.order_filled = True
                self.shares_bought = data.order.filled_qty
                self.current_symbol = data.order.symbol

    async def run_stream(self):
        logger.info('Subscribing to trade updates')
        self.trading_stream.subscribe_trade_updates(self.trade_update_handler)
        logger.info('Preparing stream')
        await self.trading_stream._run_forever()

    async def stop_stream(self):
        logger.info('Stopping stream')
        trading_stream = TradingStream(ALPACA_ID, ALPACA_KEY, paper=self.paper_bool)
        await trading_stream.stop()

    def get_symbol(self):
        if self.random_bool:
            symbol = random.choice(self.symbol_or_symbols)
            return symbol
        else:
            symbol = self.symbol_or_symbols
            return symbol

    def buy(self):
        symbol = self.get_symbol()
        market_order_data = MarketOrderRequest(
            symbol=symbol,
            qty=1,
            side=OrderSide.BUY,
            time_in_force=TimeInForce.DAY
        )
        try:
            market_order_buy = self.trading_client.submit_order(
                    order_data=market_order_data
            )
        except Exception as e:
            logger.error(f"Failed to buy {symbol}: {e}")
            return None
        return symbol, market_order_buy

    def sell(self, symbol):
        symbol = symbol
        shares = self.shares_bought
        
        market_order_data = MarketOrderRequest(
            symbol=symbol,
            qty=250,
            side=OrderSide.SELL,
            time_in_force=TimeInForce.DAY
        )
        try:
            market_order_sell = self.trading_client.submit_order(
                    order_data=market_order_data
            )
        except Exception as e:
            logger.error(f"Failed to sell {symbol}: {e}")
            return None
        return market_order_sell

    async def make_trades(self):
        market_close = datetime.datetime.now().replace(hour=14, minute=0, second=0, microsecond=0)
        while datetime.datetime.now() < market_close:
            seconds = self.seconds_between
            try:
                symbol, market_order_buy = self.buy()
                print(f"Bought {symbol}: {market_order_buy}")
            except Exception as e:
                logger.error(f"Failed to buy during trade: {e}")
                return None
            while not self.order_filled:
                logger.info('Waiting for order status update')
                await asyncio.sleep(1)
            sleep(seconds)
            try:
                market_order_sell = self.sell(symbol=symbol)
                print(f"Sold {self.current_symbol}: {market_order_sell}")
            except Exception as e:
                logger.error(f"Failed to sell during trade: {e}")
                return None
            self.order_filled = False
            self.shares_bought = 0
            sleep(seconds)
        print('Market closed, shutting down.')

class Command(BaseCommand):
    help = """This bot trades the target stock. If you want it to choose randomly, pass it a list and set the variable random=True
    """
    model = None

    def add_arguments(self, parser):
        parser.add_argument(
            '--paper',
            type=bool,
            help='Set false to live trade.',
            default=True
        )
        parser.add_argument(
            '--folder',
            type=str,
            help='source folder for files',
            default=''
        )
        parser.add_argument(
            '--symbol',
            type=str,
            help='target symbol, or list of symbols',
            default='AAPL'
        )
        parser.add_argument(
            '--random',
            type=bool,
            help="Set to true if passing a list of symbols to choose randomly from.",
            default=False
        )
        parser.add_argument(
            '--tradevalue',
            type=int,
            help="The amount the bot should trade. e.g. $40000",
            default=40000
        )
        parser.add_argument(
            '--seconds',
            type=int,
            help="The number of seconds the bot should wait between each trade.",
            default=4
        )

    def handle(self, **options):
        paper_bool = options['paper']
        random_bool = options['random']
        symbol_or_symbols = options['symbol']
        amount = options['tradevalue']
        seconds_between = options['seconds']
        log = options['folder']
        tm = TradeMaker(
            paper_bool = paper_bool,
            random = random_bool,
            symbol_or_symbols = symbol_or_symbols,
            amount = amount,
            seconds_between = seconds_between,
            log = log
        )
        loop = asyncio.get_event_loop()
        try:
            loop.run_until_complete(asyncio.gather(
                tm.run_stream(),
                tm.make_trades()
            ))
        except KeyboardInterrupt:
            tm.stop_stream()
            print("Stopped with Interrupt")
        finally:
            tm.stop_stream()
            loop.close()

当我运行该命令时,我在终端中得到以下输出(出于安全性考虑,信息被审查):

python manage.py trade_maker_v5
2023-03-30 11:51:48,342 - INFO - Subscribing to trade updates
2023-03-30 11:51:48,342 - INFO - Preparing stream
Bought AAPL: id=UUID('foo') client_order_id='bar' created_at=datetime.datetime(2023, 3, 30, 18, 51, 49, 995853, tzinfo=datetime.timezone.utc) updated_at=datetime.datetime(2023, 3, 30, 18, 51, 49, 995921, tzinfo=datetime.timezone.utc) submitted_at=datetime.datetime(2023, 3, 30, 18, 51, 49, 994623, tzinfo=datetime.timezone.utc) filled_at=None expired_at=None canceled_at=None failed_at=None replaced_at=None replaced_by=None replaces=None asset_id=UUID('foo') symbol='AAPL' asset_class=<AssetClass.US_EQUITY: 'us_equity'> notional=None qty='1' filled_qty='0' filled_avg_price=None order_class=<OrderClass.SIMPLE: 'simple'> order_type=<OrderType.MARKET: 'market'> type=<OrderType.MARKET: 'market'> side=<OrderSide.BUY: 'buy'> time_in_force=<TimeInForce.DAY: 'day'> limit_price=None stop_price=None status=<OrderStatus.PENDING_NEW: 'pending_new'> extended_hours=False legs=None trail_percent=None trail_price=None hwm=None
2023-03-30 11:51:48,480 - INFO - Waiting for order status update
2023-03-30 11:51:49,493 - INFO - Waiting for order status update
2023-03-30 11:51:50,500 - INFO - Waiting for order status update

如果我在另一个终端上运行webhook,而我的bot运行,它就能工作。我可以运行以下代码:

from alpaca.trading.stream import TradingStream

trading_stream = TradingStream(ALPACA_ID, ALPACA_KEY, paper=True)

async def update_handler(data):
    print(data)

trading_stream.subscribe_trade_updates(update_handler)
trading_stream.run()

它会在我的机器人运行时打印出所有数据。为什么它可以单独工作,而不是在协程中?

2w2cym1i

2w2cym1i1#

Django导致了这个问题。从Django中删除bot并使其成为独立的bot并添加一些异步语句后,它现在可以工作了。
Django最近在4中添加了异步。2,但这个项目运行的是Django 4。1

相关问题