We’re going to be populating our equity backtesting database with stock market data from Intrinio. Intrinio provides access to its data through both CSV bulk downloads and APIs.
In this article, I’m going to cover importing the data using the Intrinio Python SDK. If you’re new to Python, don’t worry. I’m going to cover everything that you’ll need to know.
For a detailed account of my personal experiences and opinions about this provider, please refer to my comprehensive Intrinio Review.
Now let’s get some stock data!
Sign Up for Intrinio
Intrinio provides a free developer sandbox environment for you to test their services. To get one, go to intrinio.com/signup. Once registered, you’ll have both a production and a sandbox API key. I’ll be providing code for both a production and sandbox environment for demonstration purposes.
The Intrinio API
API stands for Application Programming Interface. APIs are what we use when we want to communicate with another program programmatically. Intrinio has made our lives easier by creating an open-source Python SDK to interact with their API and including great API documentation.
Install the Intrinio Python SDK
Installing the Intrinio SDK is easy. Activate your Python environment using conda or pip.
Please keep in mind that your environment should be dedicated to your research environment. If you’re using your environment for multiple applications, using pip in a conda environment can cause problems.
conda activate conda_env_name
pip install intrinio-sdk
With the intrinio-sdk installed, we can now import it and request data from the APIs.
Store Configuration Data
Let’s store our configuration data in a YAML file and create a separate Python file to set up our connection. You’ll also need to make sure you’ve [created an equities database in python/create-an-equities-database).
Add your API keys to setup_intrino_environment.yaml.
PRODUCTION_API_KEY: your_production_api_key
SANDBOX_API_KEY: your_sandbox_api_key
USE_PRODUCTION: FALSE
Create setup_intrinio_enivironment.py to create a connection to your production or sandbox environment.
import intrinio_sdk
from intrinio_sdk.rest import ApiException
import yaml
config_file_name = 'setup_intrinio_environment.yaml'
def get_connection():
with open(config_file_name, 'r') as f:
vals = yaml.safe_load(f)
if not ('PRODUCTION_API_KEY' in vals.keys() and
'SANDBOX_API_KEY' in vals.keys() and
'USE_PRODUCTION' in vals.keys()):
raise Exception('Bad config file: ' + config_file_name)
if vals['USE_PRODUCTION'] is True:
api_key = vals['PRODUCTION_API_KEY']
print("Using Production API")
else:
api_key = vals['SANDBOX_API_KEY']
print("Using Sandbox API")
intrinio_sdk.ApiClient().configuration.api_key['api_key'] = api_key
return intrinio_sdk
def using_production():
with open(config_file_name, 'r') as f:
vals = yaml.safe_load(f)
if not ('PRODUCTION_API_KEY' in vals.keys() and
'SANDBOX_API_KEY' in vals.keys() and
'USE_PRODUCTION' in vals.keys()):
raise Exception('Bad config file: ' + config_file_name)
if vals['USE_PRODUCTION'] is True:
return True
else:
return False
Create Import Script
I’m going to cover the sandbox-related code first and then include the “production” code and suggested changes at the end. Please keep in mind that the code isn’t optimized as I’m purposely showing multiple ways to perform various operations as a learning tool. You’ll also need to correct the extra line breaks I had to add to make the code readable. Your production code should look different, but this will give you one heck of a head start!
Add Libraries & Establish Connection
We’ll grab the usual suspects so we can use them in our code. You will need to adjust your BULK_API_CALL_LIMIT to what makes sense for your subscription. Also, I’m purposely leaving out StockAdjustments in this post as a task for you to complete; however, I’ll provide the solution in the GitHub repo shortly.
from datetime import datetime, date, timedelta
import time
import pandas as pd
import numpy as np
from intrinio_sdk.rest import ApiException
from sqlalchemy import func, or_, and_
from sqlalchemy.orm import sessionmaker
from models import Company, Security, SecurityPrice, StockAdjustment, Exchange
import setup_psql_environment
import setup_intrinio_environment
BULK_API_CALL_LIMIT = 2
# Setup environment and create a session
db = setup_psql_environment.get_database()
Session = sessionmaker(bind=db)
session = Session()
intrinio_sdk = setup_intrinio_environment.get_connection()
production = setup_intrinio_environment.using_production()
# If production, add SP500 securities not already in database
# If sandbox, add all securities available not in database
query = session.query(Security).statement
existing_securities = pd.read_sql(query, db, index_col='id')
security_api = intrinio_sdk.SecurityApi()
company_api = intrinio_sdk.CompanyApi()
Getting the Securities
Getting the securities in the sandbox environment is easy. We’ll ask for all of the securities using get_all_securities. This is a large request and will have multiple pages in the api_response. So we’ll add each page to the dataframe using pd.concat
. Once we have the list of all of the stocks, we’ll filter out any existing_securities that are already in our database. Finally, once we have all of the securities_to_add, instead of inserting them one by one into our database, we’ll perform a bulk insert.
api_response = get_all_securities()
new_securities = pd.DataFrame(api_response.securities_dict)
while api_response.next_page:
api_response = get_all_securities(api_response.next_page)
page = pd.DataFrame(api_response.securities_dict)
pd.concat([new_securities, page])
columns = ['id', 'code', 'currency', 'ticker', 'name', 'figi',
'composite_figi', 'share_class_figi']
new_securities = new_securities[columns]
new_securities.rename(columns={'id': 'id_intrinio'}, inplace=True)
securities_to_add = new_securities[~new_securities['figi']
.isin(existing_securities['figi'])]
if len(securities_to_add) > 0:
print("Adding {security_count} securities."
.format(security_count=len(securities_to_add)))
session.bulk_insert_mappings(Security, securities_to_add
.to_dict(orient="records"))
session.commit()
else:
print("No securities added.")
Adding the Exchange Data
Now that we have our securities, we can add the exchanges. The logic is very similar to adding the securities. Updating each security with an exchange is where we see something new. We perform an outerjoin to combine the Security and Exchange database tables to determine which securities do not have an associated exchange. We then populate the security data with an exchange. Notice how we’re using both Pandas and SQLAlchemy for our inserts and updates.
# Get Exchanges
stock_exchange_api = intrinio_sdk.StockExchangeApi()
api_response = get_all_exchanges()
exchanges = pd.DataFrame(api_response.stock_exchanges_dict)
while api_response.next_page:
api_response = get_all_exchanges(api_response.next_page)
page = pd.DataFrame(api_response.stock_exchanges_dict)
pd.concat([exchanges, page])
exchanges.rename(columns={'id': 'id_intrinio'}, inplace=True)
query = session.query(Exchange).statement
existing_exchanges = pd.read_sql(query, db, index_col='id')
exchanges = exchanges[~exchanges['mic'].isin(existing_exchanges['mic'])]
if len(exchanges) > 0:
print("Inserting {length} exchanges.".format(length=len(exchanges)))
session.bulk_insert_mappings(Exchange, exchanges.to_dict(orient="records"))
session.commit()
else:
print("No exchanges added.")
# Update securities with exchange
securities = session.query(Security).outerjoin(Exchange).filter(Security.exchange_id is None)
if securities.count() > 0:
query = session.query(Exchange).statement
exchanges = pd.read_sql(query, db, index_col='id')
for security in securities:
api_response = get_security(security.id_intrinio)
if api_response:
security.exchange_id = int(exchanges[exchanges['mic'] ==
api_response.listing_exchange_mic].index[0])
print("Updating {length} securities with an exchange."
.format(length=securities.count()))
session.commit()
Adding the Company Data
To get the company data, we create a filter in SQLAlchemy and convert that into an SQL statement that Pandas can read with pd.read_sql. We loop through all of the companies and use the api_response to populate the company data. We add the companies one by one and set securities.has_missing_company to True so we don’t search for the company every time unnecessarily.
# Get Companies
query = session.query(Security).outerjoin(Company).filter(and_(Company.security_id == None,
Security.has_missing_company.isnot(True))).statement
securities_without_company = pd.read_sql(query, db, index_col='id')
securities_without_company_data = []
for index, security in securities_without_company.iterrows():
api_response = get_company(security.ticker)
if not api_response:
securities_without_company_data.append(security.ticker)
else:
company = Company(name=api_response.name,
cik=api_response.cik,
description=api_response.short_description[:2000]
if len(api_response.short_description) > 2000 else api_response.short_description,
company_url=api_response.company_url,
sic=api_response.sic,
employees=api_response.employees,
sector=api_response.sector,
industry_category=api_response.industry_category,
industry_group=api_response.industry_group,
security_id=index)
print("Adding company {name}.".format(name=api_response.name))
session.add(company)
session.commit()
length = (len(securities_without_company) - len(securities_without_company_data))
print("Added {length} companies.".format(length=length))
if len(securities_without_company_data) > 0:
securities_without_company = securities_without_company
.loc[securities_without_company['ticker'].isin(securities_without_company_data)]
securities_without_company['has_missing_company'] = True
securities_without_company['id'] = securities_without_company.index
session.bulk_update_mappings(Security, securities_without_company.to_dict(orient="records"))
session.commit()
print("There were {rows} new rows that did not have an associated company record."
.format(rows=len(securities_without_company_data)))
Retrieving the Price History
We can now import our price data. We check when the last time we updated our price data was with an outerjoin, update any NaN values to N
one
so we can save it into PostgreSQL, and then bulk insert it.
# Get Updated Prices
query = session.query(Security, func.max(SecurityPrice.date).label("latest_date"))
.outerjoin(SecurityPrice).group_by(Security.id).filter(Security.has_invalid_data.isnot(True)).statement
securities = pd.read_sql(query, db)
invalid_data_ids = []
for index, security in securities.iterrows():
start_date = security.latest_date + timedelta(days=1) if security.latest_date else None
api_response = get_security_prices(security.figi, start_date)
if api_response:
stock_prices = pd.DataFrame(api_response.stock_prices_dict)
stock_prices['security_id'] = security.id
while api_response.next_page:
api_response = get_security_prices(security.figi, start_date, api_response.next_page)
page = pd.DataFrame(api_response.stock_prices_dict)
page['security_id'] = security.id
pd.concat([stock_prices, page])
stock_prices_to_add = stock_prices[~stock_prices.security_id.isin(invalid_data_ids)]
stock_prices_to_add.replace({pd.np.nan: None}, inplace=True)
if len(stock_prices_to_add) > 0:
start_date = stock_prices_to_add['date'].min()
end_date = stock_prices_to_add['date'].max()
print("Ticker {ticker}: Adding {rows} rows to the security prices
database with dates between {start_date} - {end_date}."
.format(ticker=security.ticker, rows=len(stock_prices_to_add),
start_date=start_date, end_date=end_date))
session.bulk_insert_mappings(SecurityPrice, stock_prices_to_add.to_dict(orient="records"))
session.commit()
What about Production?
The only difference between this code for the sandbox and the production code is how we get the securities. We’ll start by providing our script with the S&P500 Historical Components and Changes. We then search for the security instead of just looking it up by ticker as multiple securities share tickers. If you’ve graduated from the sandbox environment and are building a production backtesting database, you’ll be able to read the code for the details you need. Again, this isn’t optimized, and you may want to have multiple data sources for verification and improve upon the searching.
Additionally, once you’ve imported the securities, you’ll want to open up Jupyter notebook and validate the data in Pandas.
The Code
Here’s the full code listing. Please play around and experiment with it, and please let me know if there’s anything you would like me to modify or update. Happy hacking!
from datetime import datetime, date, timedelta
import time
import pandas as pd
import numpy as np
from intrinio_sdk.rest import ApiException
from sqlalchemy import func, or_, and_
from sqlalchemy.orm import sessionmaker
from models import Company, Security, SecurityPrice, StockAdjustment, Exchange
import setup_psql_environment
import setup_intrinio_environment
BULK_API_CALL_LIMIT = 3
def get_all_exchanges(next_page=''):
# https://docs.intrinio.com/documentation/python/get_all_stock_exchanges_v2
page_size = 100
try:
api_response = stock_exchange_api.get_all_stock_exchanges(
page_size=page_size)
except ApiException as e:
print("Exception: StockExchangeApi->get_all_stock_exchanges: %s\r\n" % e)
return None
return api_response
def get_all_securities(delisted='', next_page=''):
# https://docs.intrinio.com/documentation/python/get_all_securities_v2
active = True
delisted = False
currency = 'USD'
composite_mic = 'USCOMP'
next_page = next_page
try:
api_response = security_api.get_all_securities(
active=active,
delisted=delisted,
currency=currency,
composite_mic=composite_mic)
except ApiException as e:
print("Exception: SecurityApi->get_all_securities: %s\r\n" % e)
return None
return api_response
def get_security(identifier):
# https://docs.intrinio.com/documentation/python/get_security_by_id_v2
identifier = identifier
try:
api_response = security_api.get_security_by_id(identifier)
except ApiException as e:
print("Error trying to get data for ", identifier)
print("Exception: SecurityApi->get_security_by_id: %s\r\n" % e)
return None
return api_response
def get_security_prices(identifier, start_date='', next_page=''):
# https://docs.intrinio.com/documentation/python/get_security_stock_prices_v2
start_date = start_date
frequency = 'daily'
page_size = 100
if not start_date:
page_size = 10000
try:
if (page_size <= 100):
api_response = security_api.get_security_stock_prices(
identifier,
start_date=start_date,
frequency=frequency,
page_size=page_size,
next_page=next_page)
else:
api_response = security_api.get_security_stock_prices(
identifier,
start_date=start_date,
frequency=frequency,
page_size=page_size,
next_page=next_page)
time.sleep(BULK_API_CALL_LIMIT)
except ApiException as e:
print("Exception: SecurityApi->get_security_historical_data: %s\n" % e)
return None
return api_response
def get_company(identifier):
# https://docs.intrinio.com/documentation/python/get_company_v2
identifier = identifier
try:
api_response = company_api.get_company(identifier)
except ApiException as e:
print("Exception: CompanyApi->get_company: %s\r\n" % e)
return None
return api_response
# Setup environment and create a session
db = setup_psql_environment.get_database()
Session = sessionmaker(bind=db)
session = Session()
intrinio_sdk = setup_intrinio_environment.get_connection()
production = setup_intrinio_environment.using_production()
# If production, add SP500 securities not already in database
# If sandbox, add all securities available not in database
query = session.query(Security).statement
existing_securities = pd.read_sql(query, db, index_col='id')
security_api = intrinio_sdk.SecurityApi()
company_api = intrinio_sdk.CompanyApi()
if production:
# Get S&P500 constituents
sp500_constituents = pd.read_csv("sp500_constituents.csv",
dtype={'cik': object})
securities_to_add = sp500_constituents[~sp500_constituents['ticker']
.isin(existing_securities['ticker'])]
# Lookup and compare ticker to company name.
missing_securities = []
strings_to_remove = ['limited', 'ltd', 'incorporated', 'inc', '.']
for index, sp500_constituent in securities_to_add.iterrows():
sp500_constituent.replace(np.nan, '', inplace=True)
name = sp500_constituent['name'].lower()
ticker = sp500_constituent['ticker'].upper()
cik = sp500_constituent['cik']
if cik:
try:
api_response = company_api.search_companies(cik)
except ApiException as e:
print("Exception: CompanyApi->search_companies: %s\r\n" % e)
continue
if api_response:
for company in api_response.companies:
if company.ticker and company.ticker.upper() == ticker:
name = company.name
break
else:
for string in strings_to_remove:
name = name.replace(string, '')
query = name + ' ' + ticker
try:
api_response = security_api.search_securities(query)
except ApiException as e:
print("Exception when calling CompanyApi->search_companies: %s\r\n" % e)
continue
if api_response:
match_found = False
for security in api_response.securities:
if security.ticker and security.code == 'EQS'
and security.ticker.upper() == ticker.upper():
match_found = True
api_response = get_security(security.id)
if api_response:
stock = Security(
id_intrinio=api_response.id,
code=api_response.code,
currency=api_response.currency,
ticker=api_response.ticker,
name=api_response.name,
figi=api_response.figi,
composite_figi=api_response.composite_figi,
share_class_figi=api_response.share_class_figi
)
print("Adding security {name} with ticker: {ticker}."
.format(name=stock.name, ticker=stock.ticker))
session.add(stock)
session.commit()
break
if not match_found:
print("\nNo match found for query: {query}\n"
.format(query=query))
missing_securities.append(query)
else:
print("No API response for: ", query)
missing_securities.append(query)
print('There were {length} missing securities. Trying search with larger page size.'
.format(length=len(missing_securities)))
for query in missing_securities:
try:
api_response = security_api.search_securities(
query,
page_size=10000)
time.sleep(BULK_API_CALL_LIMIT)
except ApiException as e:
print("Exception when calling CompanyApi->search_companies: %s\r\n" % e)
continue
if api_response:
match_found = False
for security in api_response.securities:
if security.ticker and security.code == 'EQS'
and security.ticker.upper() == ticker.upper():
match_found = True
api_response = get_security(security.id)
if api_response:
stock = Security(
id_intrinio=api_response.id,
code=api_response.code,
currency=api_response.currency,
ticker=api_response.ticker,
name=api_response.name,
figi=api_response.figi,
composite_figi=api_response.composite_figi,
share_class_figi=api_response.share_class_figi
)
print(query)
print("Adding security {name} with ticker: {ticker}.\n"
.format(name=stock.name, ticker=stock.ticker))
session.add(stock)
session.commit()
break
if not match_found:
print("A match was not found for query: ", query)
else:
print("NO API RESPONSE FOR: ", query)
else:
api_response = get_all_securities()
new_securities = pd.DataFrame(api_response.securities_dict)
while api_response.next_page:
api_response = get_all_securities(api_response.next_page)
page = pd.DataFrame(api_response.securities_dict)
pd.concat([new_securities, page])
columns = ['id', 'code', 'currency', 'ticker', 'name', 'figi',
'composite_figi', 'share_class_figi']
new_securities = new_securities[columns]
new_securities.rename(columns={'id': 'id_intrinio'}, inplace=True)
securities_to_add = new_securities[~new_securities['figi']
.isin(existing_securities['figi'])]
if len(securities_to_add) > 0:
print("Adding {security_count} securities."
.format(security_count=len(securities_to_add)))
session.bulk_insert_mappings(Security, securities_to_add
.to_dict(orient="records"))
session.commit()
else:
print("No securities added.")
# Get Exchanges
stock_exchange_api = intrinio_sdk.StockExchangeApi()
api_response = get_all_exchanges()
exchanges = pd.DataFrame(api_response.stock_exchanges_dict)
while api_response.next_page:
api_response = get_all_exchanges(api_response.next_page)
page = pd.DataFrame(api_response.stock_exchanges_dict)
pd.concat([exchanges, page])
exchanges.rename(columns={'id': 'id_intrinio'}, inplace=True)
query = session.query(Exchange).statement
existing_exchanges = pd.read_sql(query, db, index_col='id')
exchanges = exchanges[~exchanges['mic'].isin(existing_exchanges['mic'])]
if len(exchanges) > 0:
print("Inserting {length} exchanges.".format(length=len(exchanges)))
session.bulk_insert_mappings(Exchange, exchanges.to_dict(orient="records"))
session.commit()
else:
print("No exchanges added.")
# Update securities with exchange
securities = session.query(Security).outerjoin(Exchange).filter(Security.exchange_id is None)
if securities.count() > 0:
query = session.query(Exchange).statement
exchanges = pd.read_sql(query, db, index_col='id')
for security in securities:
api_response = get_security(security.id_intrinio)
if api_response:
security.exchange_id = int(exchanges[exchanges['mic'] ==
api_response.listing_exchange_mic].index[0])
print("Updating {length} securities with an exchange."
.format(length=securities.count()))
session.commit()
# Get Companies
query = session.query(Security).outerjoin(Company).filter(and_(Company.security_id == None,
Security.has_missing_company.isnot(True))).statement
securities_without_company = pd.read_sql(query, db, index_col='id')
securities_without_company_data = []
for index, security in securities_without_company.iterrows():
api_response = get_company(security.ticker)
if not api_response:
securities_without_company_data.append(security.ticker)
else:
company = Company(name=api_response.name,
cik=api_response.cik,
description=api_response.short_description[:2000]
if len(api_response.short_description) > 2000 else api_response.short_description,
company_url=api_response.company_url,
sic=api_response.sic,
employees=api_response.employees,
sector=api_response.sector,
industry_category=api_response.industry_category,
industry_group=api_response.industry_group,
security_id=index)
print("Adding company {name}.".format(name=api_response.name))
session.add(company)
session.commit()
length = (len(securities_without_company) - len(securities_without_company_data))
print("Added {length} companies.".format(length=length))
if len(securities_without_company_data) > 0:
securities_without_company = securities_without_company
.loc[securities_without_company['ticker'].isin(securities_without_company_data)]
securities_without_company['has_missing_company'] = True
securities_without_company['id'] = securities_without_company.index
session.bulk_update_mappings(Security, securities_without_company.to_dict(orient="records"))
session.commit()
print("There were {rows} new rows that did not have an associated company record."
.format(rows=len(securities_without_company_data)))
# Get Updated Prices
query = session.query(Security, func.max(SecurityPrice.date).label("latest_date"))
.outerjoin(SecurityPrice)
.group_by(Security.id)
.filter(Security.has_invalid_data.isnot(True)).statement
securities = pd.read_sql(query, db)
invalid_data_ids = []
for index, security in securities.iterrows():
start_date = security.latest_date + timedelta(days=1) if security.latest_date else None
api_response = get_security_prices(security.figi, start_date)
if api_response:
stock_prices = pd.DataFrame(api_response.stock_prices_dict)
stock_prices['security_id'] = security.id
while api_response.next_page:
api_response = get_security_prices(security.figi, start_date, api_response.next_page)
page = pd.DataFrame(api_response.stock_prices_dict)
page['security_id'] = security.id
pd.concat([stock_prices, page])
stock_prices_to_add = stock_prices[~stock_prices.security_id.isin(invalid_data_ids)]
stock_prices_to_add.replace({pd.np.nan: None}, inplace=True)
if len(stock_prices_to_add) > 0:
start_date = stock_prices_to_add['date'].min()
end_date = stock_prices_to_add['date'].max()
print("Ticker {ticker}: Adding {rows} rows to the security prices
database with dates between {start_date} - {end_date}."
.format(ticker=security.ticker, rows=len(stock_prices_to_add),
start_date=start_date, end_date=end_date))
session.bulk_insert_mappings(SecurityPrice, stock_prices_to_add.to_dict(orient="records"))
session.commit()