Stream data to AWS Kinesis

Connect to a Polygon websocket to receive messages for AAPL trades and stream them into AWS Kinesis.

In [16]:
import boto3
from boto import kinesis
import getpass
import json
import time
from polygon import WebSocketClient, STOCKS_CLUSTER

Use the access and secret keys for your AWS account

In [18]:
ACCESS_KEY = getpass.getpass()
In [19]:
SECRET_KEY = getpass.getpass()

Generate a session token with the AWS CLI command:

aws sts get-session-token --duration-seconds 129600

In [21]:
SESSION_TOKEN = getpass.getpass()

Input your KEY_ID for Alpaca/Polygon

In [11]:
# Alpaca/Polygon KEY_ID
KEY_ID = getpass.getpass()
In [39]:
client = boto3.client(
    'kinesis',
    aws_access_key_id=ACCESS_KEY,
    aws_secret_access_key=SECRET_KEY,
    aws_session_token=SESSION_TOKEN
)
In [41]:
client = boto3.client('kinesis')
In [22]:
kinesis = kinesis.connect_to_region("us-east-1")
In [ ]:
kinesis.describe_stream("polygon-trades-aapl")
In [24]:
kinesis.list_streams()
Out[24]:
{'HasMoreStreams': False, 'StreamNames': ['polygon-trades-aapl']}
In [44]:
# This function runs everytime I receive a message from the websocket
def process_event(message):
    event_list = json.loads(message)
    # there can be one or more events per message
    for event in event_list:
        if event['ev'] == 'T':
            kinesis.put_record("polygon-trades-aapl", json.dumps(event), "partitionkey")
        
my_client = WebSocketClient(STOCKS_CLUSTER, KEY_ID, process_event)
my_client.run_async()

# subscribe to the AAPL trade channel for 5 seconds.
my_client.subscribe("T.AAPL")
time.sleep(5)

# close the websocket after 100 seconds
my_client.close_connection()