Async Streaming

A convenient wrapper around the Streaming API

IMPORTANT Polygon.io allows one simultaneous connection to one cluster at a time (clusters: stocks, options, forex, crypto). which means 4 total concurrent streams (Of course you need to have subscriptions for them).

Connecting to a cluster which already has an existing stream connected to it would result in existing connection getting dropped and new connection would be established

Note that This page describes the asyncio based streaming client. If you’re looking for callback based streaming client, See Callback Streaming

Also note that async client has a reconnection mechanism built into it already. It is very basic at the moment. It resubscribes to the same set of services it already had before the disconnection and restores the handlers when reconnection establishes. More info in starting the stream below.

It also exposes a few methods which you could use to create your own reconnect mechanism. Method polygon.streaming.async_streaming.AsyncStreamClient.reconnect() is one of them

Have a reconnect mechanism to share? Share in discussions or on the wiki.

Creating the client

Creating a client is just creating an instance of polygon.AsyncStreamClient. Note that this expects a few arguments where most of them have default values.

This is how the initializer looks like:

AsyncStreamClient.__init__(api_key: str, cluster, host='socket.polygon.io', ping_interval: int | None = 20, ping_timeout: int | None = 19, max_message_size: int = 1048576, max_memory_queue: int | None = 32, read_limit: int = 65536, write_limit: int = 65536)

Initializes the stream client for async streaming Official Docs

Parameters:
  • api_key – Your API Key. Visit your dashboard to get yours.

  • cluster – Which market/cluster to connect to. See polygon.enums.StreamCluster for choices. NEVER connect to the same cluster again if there is an existing stream connected to it. The existing connection would be dropped and new one will be established. You can have up to 4 concurrent streams connected to 4 different clusters.

  • host – Host url to connect to. Default is real time. See polygon.enums.StreamHost for choices

  • ping_interval – Send a ping to server every specified number of seconds to keep the connection alive. Defaults to 20 seconds. Setting to 0 disables pinging.

  • ping_timeout – The number of seconds to wait after sending a ping for the response (pong). If no response is received from the server in those many seconds, stream is considered dead and exits with code 1011. Defaults to 19 seconds.

  • max_message_size – The max_size parameter enforces the maximum size for incoming messages in bytes. The default value is 1 MiB (not MB). None disables the limit. If a message larger than the maximum size is received, recv() will raise ConnectionClosedError and the connection will be closed with code 1009

  • max_memory_queue – sets the maximum length of the queue that holds incoming messages. The default value is 32. None disables the limit. Messages are added to an in-memory queue when they’re received; then recv() pops from that queue

  • read_limit – sets the high-water limit of the buffer for incoming bytes. The low-water limit is half the high-water limit. The default value is 64 KiB, half of asyncio’s default. Don’t change if you are unsure of what it implies.

  • write_limit – The write_limit argument sets the high-water limit of the buffer for outgoing bytes. The low-water limit is a quarter of the high-water limit. The default value is 64 KiB, equal to asyncio’s default. Don’t change if you’re unsure what it implies.

Example use:

import polygon

stream_client = polygon.AsyncStreamClient('KEY', 'stocks')  # in the simplest form

Note that you don’t have to call login methods as the library does it internally itself.

Starting the Stream

Once you have a stream client, you MUST subscribe to streams before you start the main stream loop. Note that you can alter your subscriptions from other coroutines easily even after starting the main stream loop. See subscriptions methods below this section to know how to subscribe to streams.

AFTER you have called your initial subscription methods, you have two ways to start the main stream loop.

Without using the built-in reconnect functionality

In this case you’d need to have your own while loop, like so:

# assuming we create the client and sub to stream here already.
while 1:
    await stream_client.handle_messages()

and that’s basically it. handle_message would take care of receiving messages and calling appropriate handlers (see below section for info on that aspect). You may want to implement your own reconnect mechanism here.

If that’s your use case, you can basically ignore the below section completely.

Using the built-in reconnect functionality

here you don’t need any outer while loop of your own. The lib has inner while loops and mechanisms to trap disconnection errors and will attempt to reconnect.

Note that this function is basic and not perfect yet and will continue to improve as we move ahead. If you figure out a way to implement reconnection, feel free to share that in discussions or on the wiki.

simple use example

# assuming we already have a client subscribed to streams
await stream_client.handle_messages(reconnect=True)

That’s it. This should be enough for most users. For those who need more control over the behavior here; this is how the method definition looks like:

async AsyncStreamClient.handle_messages(reconnect: bool = False, max_reconnection_attempts=5, reconnection_delay=5)

The primary method to start the stream. Connects & Logs in by itself. Allows Reconnecting by simply altering a parameter (subscriptions are persisted across reconnected streams)

Parameters:
  • reconnect – If this is False (default), it simply awaits the next message and calls the appropriate handler. Uses the _default_process_message() if no handler was specified. You should use the statement inside a while loop in that case. Setting it to True creates an inner loop which traps disconnection errors except login failed due to invalid Key, and reconnects to the stream with the same subscriptions it had earlier before getting disconnected.

  • max_reconnection_attempts – Determines how many times should the program attempt to reconnect in case of failed attempts. The Counter is reset as soon as a successful connection is re-established. Setting it to False disables the limit which is NOT recommended unless you know you got a situation. This value is ignored if reconnect is False (The default). Defaults to 5.

  • reconnection_delay – Number of seconds to wait before attempting to reconnect after a failed reconnection attempt or a disconnection. This value is ignored if reconnect is False (the default). Defaults to 5.

Returns:

None

Subscribing/Unsubscribing to Streams

All subscription methods have names in pattern subscribe_service_name and unsubscribe_service_name.

Symbols names must be specified as a list of symbols: ['AMD', 'NVDA', 'LOL'] is the correct way to specify symbols. Not specifying a list of symbols results in the action being applied to ALL tickers in that service. Note that either of [], None, ['*'] or 'all' as value of symbols would also results in ALL tickers.

The library allows specifying a string for symbol argument (that string is sent exactly as it is without processing), but only do that if you have the absolute need to. Most people should just specify a list. Note that a list of single ticker is accepted.

Options and Crypto stream endpoints expect prefixes ``O:, X:`` respectively in front of every ticker. The library handles this for you so you can pass symbols with or without those prefixes.

The Second argument on all unsubscribe methods is the handler_function which represents the handler function you’d like the library to call when a message from that service is received. You can have one handler for multiple services. Not supplying a handler results in the library using the default message handler.

All methods are async coroutines which need to be awaited.

await stream_client.subscribe_stock_trades(['AMD', 'NVDA'], handler_function=my_handler_function)

By default, the library will also enforce upper case for all symbols being passed. To disable this enforcement, just pass in force_uppercase_symbols=False when subscribing in the methods below.

Handling Messages

your handler functions should accept one argument which indicates the message.

async def sample_handler(msg):
    print(f'Look at me! I am the handler now. {msg}')

Note that you can also use a sync function as handler

def sample_handler(msg):
    print(f'I am also a handler. But sync.. {msg}')

In async streaming, the library does the json decoding for you internally, and you will always receive a list/dict python object (a list 99.99% of the time except the initial status messages). You don’t have to do json decoding yourself. Internally it is already done using json.loads(msg)

Once you have the message in your callback handler function, you can process it the way you want. print it out, write it to a file, push it to a redis queue, write to a database, offload to a multi-threaded queue. Just whatever.

The default handler for the messages is _default_process_message.

Changing message handler functions while stream is running

Library allows you to change your handlers after your main stream loop has started running.

The function you’d need is:

async AsyncStreamClient.change_handler(service_prefix, handler_function)

Change your handler function for a service. Can be used to update handlers dynamically while stream is running.

Parameters:
  • service_prefix – The Prefix of the service you want to change handler for. see polygon.enums.StreamServicePrefix for choices.

  • handler_function – The new handler function to assign for this service

Returns:

None

Note that you should never need to change handler for status ( which handles ev messages) unless you know you got a situation. Service prefixes just indicate which service (e.g. stock trades? options aggregates?) you want to change the handler.

Closing the Stream

To turn off the streamer and shut down the websockets connection gracefully, it is advised to await stream_client.close_stream() when closing the application. Not an absolute necessity but a good software practice.

Streams

Stock Streams

Stock Trades

async AsyncStreamClient.subscribe_stock_trades(symbols: list | None = None, handler_function=None, force_uppercase_symbols: bool = True)

Get Real time trades for provided symbol(s)

Parameters:
  • symbols – A list of tickers to subscribe to. Defaults to ALL tickers.

  • handler_function – The function which you’d want to call to process messages received from this subscription. Defaults to None which uses the default process message function.

  • force_uppercase_symbols – Set to False if you don’t want the library to make all symbols upper case

Returns:

None

async AsyncStreamClient.unsubscribe_stock_trades(symbols: list | None = None)

Unsubscribe from the stream for the supplied ticker symbols.

Parameters:

symbols – A list of tickers to unsubscribe from. Defaults to ALL tickers.

Returns:

None

Stock Quotes

async AsyncStreamClient.subscribe_stock_quotes(symbols: list | None = None, handler_function=None, force_uppercase_symbols: bool = True)

Get Real time quotes for provided symbol(s)

Parameters:
  • symbols – A list of tickers to subscribe to. Defaults to ALL tickers.

  • handler_function – The function which you’d want to call to process messages received from this subscription. Defaults to None which uses the default process message function.

  • force_uppercase_symbols – Set to False if you don’t want the library to make all symbols upper case

Returns:

None

async AsyncStreamClient.unsubscribe_stock_quotes(symbols: list | None = None)

Unsubscribe from the stream for the supplied ticker symbols.

Parameters:

symbols – A list of tickers to unsubscribe from. Defaults to ALL tickers.

Returns:

None

Stock Minute Aggregates (OCHLV)

async AsyncStreamClient.subscribe_stock_minute_aggregates(symbols: list | None = None, handler_function=None, force_uppercase_symbols: bool = True)

Get Real time Minute Aggregates for provided symbol(s)

Parameters:
  • symbols – A list of tickers to subscribe to. Defaults to ALL ticker.

  • handler_function – The function which you’d want to call to process messages received from this subscription. Defaults to None which uses the default process message function.

  • force_uppercase_symbols – Set to False if you don’t want the library to make all symbols upper case

Returns:

None

async AsyncStreamClient.unsubscribe_stock_minute_aggregates(symbols: list | None = None)

Unsubscribe from the stream for the supplied ticker symbols.

Parameters:

symbols – A list of tickers to unsubscribe from. Defaults to ALL tickers.

Returns:

None

Stock Second Aggregates (OCHLV)

async AsyncStreamClient.subscribe_stock_second_aggregates(symbols: list | None = None, handler_function=None, force_uppercase_symbols: bool = True)

Get Real time Seconds Aggregates for provided symbol(s)

Parameters:
  • symbols – A list of tickers to subscribe to. Defaults to ALL ticker.

  • handler_function – The function which you’d want to call to process messages received from this subscription. Defaults to None which uses the default process message function.

  • force_uppercase_symbols – Set to False if you don’t want the library to make all symbols upper case

Returns:

None

async AsyncStreamClient.unsubscribe_stock_second_aggregates(symbols: list | None = None)

Unsubscribe from the stream for the supplied ticker symbols.

Parameters:

symbols – A list of tickers to unsubscribe from. Defaults to ALL tickers.

Returns:

None

Stock Limit Up Limit Down (LULD)

async AsyncStreamClient.subscribe_stock_limit_up_limit_down(symbols: list | None = None, handler_function=None, force_uppercase_symbols: bool = True)

Get Real time LULD Events for provided symbol(s)

Parameters:
  • symbols – A list of tickers to subscribe to. Defaults to ALL ticker.

  • handler_function – The function which you’d want to call to process messages received from this subscription. Defaults to None which uses the default process message function.

  • force_uppercase_symbols – Set to False if you don’t want the library to make all symbols upper case

Returns:

None

async AsyncStreamClient.unsubscribe_stock_limit_up_limit_down(symbols: list | None = None)

Unsubscribe from the stream for the supplied ticker symbols.

Parameters:

symbols – A list of tickers to unsubscribe from. Defaults to ALL tickers.

Returns:

None

Stock Imbalances

async AsyncStreamClient.subscribe_stock_imbalances(symbols: list | None = None, handler_function=None, force_uppercase_symbols: bool = True)

Get Real time Imbalance Events for provided symbol(s)

Parameters:
  • symbols – A list of tickers to subscribe to. Defaults to ALL ticker.

  • handler_function – The function which you’d want to call to process messages received from this subscription. Defaults to None which uses the default process message function.

  • force_uppercase_symbols – Set to False if you don’t want the library to make all symbols upper case

Returns:

None

async AsyncStreamClient.unsubscribe_stock_imbalances(symbols: list | None = None)

Unsubscribe from the stream for the supplied ticker symbols.

Parameters:

symbols – A list of tickers to unsubscribe from. Defaults to ALL tickers.

Returns:

None

Options Streams

Options Trades

async AsyncStreamClient.subscribe_option_trades(symbols: list | None = None, handler_function=None, force_uppercase_symbols: bool = True)

Get Real time options trades for provided ticker(s)

Parameters:
  • symbols – A list of tickers to subscribe to. Defaults to ALL ticker. You can specify with or without the prefix O:

  • handler_function – The function which you’d want to call to process messages received from this subscription. Defaults to None which uses the default process message function.

  • force_uppercase_symbols – Set to False if you don’t want the library to make all symbols upper case

Returns:

None

async AsyncStreamClient.unsubscribe_option_trades(symbols: list | None = None)

Unsubscribe from the stream for the supplied option symbols.

Parameters:

symbols – A list of symbols to unsubscribe from. Defaults to ALL tickers. You can specify with or without the prefix O:

Returns:

None

Options Quotes

async AsyncStreamClient.subscribe_option_quotes(symbols: list | None = None, handler_function=None, force_uppercase_symbols: bool = True)

Get Real time options quotes for provided ticker(s)

Parameters:
  • symbols – A list of tickers to subscribe to. Defaults to ALL ticker. You can specify with or without the prefix O:

  • handler_function – The function which you’d want to call to process messages received from this subscription. Defaults to None which uses the default process message function.

  • force_uppercase_symbols – Set to False if you don’t want the library to make all symbols upper case

Returns:

None

async AsyncStreamClient.unsubscribe_option_quotes(symbols: list | None = None)

Unsubscribe from the stream for the supplied option symbols.

Parameters:

symbols – A list of symbols to unsubscribe from. Defaults to ALL tickers. You can specify with or without the prefix O:

Returns:

None

Options Minute Aggregates (OCHLV)

async AsyncStreamClient.subscribe_option_minute_aggregates(symbols: list | None = None, handler_function=None, force_uppercase_symbols: bool = True)

Get Real time options minute aggregates for given ticker(s)

Parameters:
  • symbols – A list of tickers to subscribe to. Defaults to ALL ticker. You can specify with or without the prefix O:

  • handler_function – The function which you’d want to call to process messages received from this subscription. Defaults to None which uses the default process message function.

  • force_uppercase_symbols – Set to False if you don’t want the library to make all symbols upper case

Returns:

None

async AsyncStreamClient.unsubscribe_option_minute_aggregates(symbols: list | None = None)

Unsubscribe from the stream for the supplied option symbols.

Parameters:

symbols – A list of symbols to unsubscribe from. Defaults to ALL tickers. You can specify with or without the prefix O:

Returns:

None

Options Second Aggregates (OCHLV)

async AsyncStreamClient.subscribe_option_second_aggregates(symbols: list | None = None, handler_function=None, force_uppercase_symbols: bool = True)

Get Real time options second aggregates for given ticker(s)

Parameters:
  • symbols – A list of tickers to subscribe to. Defaults to ALL ticker. You can specify with or without the prefix O:

  • handler_function – The function which you’d want to call to process messages received from this subscription. Defaults to None which uses the default process message function.

  • force_uppercase_symbols – Set to False if you don’t want the library to make all symbols upper case

Returns:

None

async AsyncStreamClient.unsubscribe_option_second_aggregates(symbols: list | None = None)

Unsubscribe from the stream for the supplied option symbols.

Parameters:

symbols – A list of symbols to unsubscribe from. Defaults to ALL tickers. You can specify with or without the prefix O:

Returns:

None

Forex Streams

Forex Quotes

async AsyncStreamClient.subscribe_forex_quotes(symbols: list | None = None, handler_function=None, force_uppercase_symbols: bool = True)

Get Real time Forex Quotes for provided symbol(s)

Parameters:
  • symbols – A list of forex tickers. Default is * which subscribes to ALL tickers in the market. each Ticker must be in format: from/to. For example: USD/CNH.

  • handler_function – The function which you’d want to call to process messages received from this subscription. Defaults to None which uses the default process message function.

  • force_uppercase_symbols – Set to False if you don’t want the library to make all symbols upper case

Returns:

None

async AsyncStreamClient.unsubscribe_forex_quotes(symbols: list | None = None)

Unsubscribe from the stream for the supplied forex symbols.

Parameters:

symbols – A list of forex tickers. Default is * which unsubscribes to ALL tickers in the market. each Ticker must be in format: from/to. For example: USD/CNH.

Returns:

None

Forex Minute Aggregates (OCHLV)

async AsyncStreamClient.subscribe_forex_minute_aggregates(symbols: list | None = None, handler_function=None, force_uppercase_symbols: bool = True)

Get Real time Forex Minute Aggregates for provided symbol(s)

Parameters:
  • symbols – A list of forex tickers. Default is * which subscribes to ALL tickers in the market. each Ticker must be in format: from/to. For example: USD/CNH

  • handler_function – The function which you’d want to call to process messages received from this subscription. Defaults to None which uses the default process message function.

  • force_uppercase_symbols – Set to False if you don’t want the library to make all symbols upper case

Returns:

None

async AsyncStreamClient.unsubscribe_forex_minute_aggregates(symbols: list | None = None)

Unsubscribe from the stream for the supplied forex symbols.

Parameters:

symbols – A list of forex tickers. Default is * which unsubscribes to ALL tickers in the market. each Ticker must be in format: from/to. For example: USD/CNH.

Returns:

None

Crypto Streams

Crypto Trades

async AsyncStreamClient.subscribe_crypto_trades(symbols: list | None = None, handler_function=None, force_uppercase_symbols: bool = True)

Get Real time Crypto Trades for provided symbol(s)

Parameters:
  • symbols – A list of Crypto tickers. Default is * which subscribes to ALL tickers in the market. each Ticker must be in format: from-to. For example: BTC-USD. you can pass symbols with or without the prefix X:

  • handler_function – The function which you’d want to call to process messages received from this subscription. Defaults to None which uses the default process message function.

  • force_uppercase_symbols – Set to False if you don’t want the library to make all symbols upper case

Returns:

None

async AsyncStreamClient.unsubscribe_crypto_trades(symbols: list | None = None)

Unsubscribe from the stream for the supplied crypto symbols.

Parameters:

symbols – A list of Crypto tickers. Default is * which subscribes to ALL tickers in the market. each Ticker must be in format: from-to. For example: BTC-USD. you can pass symbols with or without the prefix X:

Returns:

None

Crypto Quotes

async AsyncStreamClient.subscribe_crypto_quotes(symbols: list | None = None, handler_function=None, force_uppercase_symbols: bool = True)

Get Real time Crypto Quotes for provided symbol(s)

Parameters:
  • symbols – A list of Crypto tickers. Default is * which subscribes to ALL tickers in the market. each Ticker must be in format: from-to. For example: BTC-USD. you can pass symbols with or without the prefix X:

  • handler_function – The function which you’d want to call to process messages received from this subscription. Defaults to None which uses the default process message function.

  • force_uppercase_symbols – Set to False if you don’t want the library to make all symbols upper case

Returns:

None

async AsyncStreamClient.unsubscribe_crypto_quotes(symbols: list | None = None)

Unsubscribe from the stream for the supplied crypto symbols.

Parameters:

symbols – A list of Crypto tickers. Default is * which subscribes to ALL tickers in the market. each Ticker must be in format: from-to. For example: BTC-USD. you can pass symbols with or without the prefix X:

Returns:

None

Crypto Minute Aggregates (OCHLV)

async AsyncStreamClient.subscribe_crypto_minute_aggregates(symbols: list | None = None, handler_function=None, force_uppercase_symbols: bool = True)

Get Real time Crypto Minute Aggregates for provided symbol(s)

Parameters:
  • symbols – A list of Crypto tickers. Default is * which subscribes to ALL tickers in the market. each Ticker must be in format: from-to. For example: BTC-USD. you can pass symbols with or without the prefix X:

  • handler_function – The function which you’d want to call to process messages received from this subscription. Defaults to None which uses the default process message function.

  • force_uppercase_symbols – Set to False if you don’t want the library to make all symbols upper case

Returns:

None

async AsyncStreamClient.unsubscribe_crypto_minute_aggregates(symbols: list | None = None)

Unsubscribe from the stream for the supplied crypto symbols.

Parameters:

symbols – A list of Crypto tickers. Default is * which subscribes to ALL tickers in the market. each Ticker must be in format: from-to. For example: BTC-USD. you can pass symbols with or without the prefix X:

Returns:

None

Crypto Level 2 Book

async AsyncStreamClient.subscribe_crypto_level2_book(symbols: list | None = None, handler_function=None, force_uppercase_symbols: bool = True)

Get Real time Crypto Level 2 Book Data for provided symbol(s)

Parameters:
  • symbols – A list of Crypto tickers. Default is * which subscribes to ALL tickers in the market. each Ticker must be in format: from-to. For example: BTC-USD. you can pass symbols with or without the prefix X:

  • handler_function – The function which you’d want to call to process messages received from this subscription. Defaults to None which uses the default process message function.

  • force_uppercase_symbols – Set to False if you don’t want the library to make all symbols upper case

Returns:

None

async AsyncStreamClient.unsubscribe_crypto_level2_book(symbols: list | None = None)

Unsubscribe from the stream for the supplied crypto symbols.

Parameters:

symbols – A list of Crypto tickers. Default is * which subscribes to ALL tickers in the market. each Ticker must be in format: from-to. For example: BTC-USD. you can pass symbols with or without the prefix X:

Returns:

None