/ TECHNOLOGY

How to Create a Zipline Equity Bundle

Importing custom data into Zipline can be tricky, especially for users new to Python and Pandas. I’m here to remedy that. In this guide, I’ll explain how to create, register and ingest a custom equity bundle so that you can use your own custom data in your equity research.

How to Create a Bundle

At a high level, here are the steps we’re going to take:

  1. Get good data
  2. Create a python bundle file
  3. Register the bundle
  4. Use Zipline to ingest the bundle

There’s only one complicated step when writing a bundle, and that’s to create the bundle python file.

Get Good Data

Selecting a data provider and getting good data is critical to your success when analyzing investment strategies. While this is a much deeper topic for another article, I’ll summarize the most common issues: Equity benchmarks need to take into account survivorship bias, and individual equity prices need to be adjusted for dividends, splits, and spinoffs. And this says nothing about the quality or coverage of the data. If you already have a data provider, that’s great. If you don’t, the following providers might work for you. I use Quandl.

I’m going to assume you’ve already downloaded the data as a zip file on your computer for this guide; however, in practice, you’ll likely want to set up a securities database. You would also want to analyze the data before uploading to correct missing or inaccurate data with Pandas magic such as pandas.DataFrame.fillna.

Required Data

We need the following fields when creating an equities data bundle:

  • symbol
  • date
  • unadjusted_open
  • unadjusted_high
  • unadjusted_low
  • unadjusted_close
  • unadjusted_volume
  • splits
  • dividends

Creating the Bundle File

The bundle file is a python file that defines an ingest function with a specific signature that Zipline uses when running the zipline ingest command. Within the ingest function, we perform the following:

  1. Format the data
  2. Create the metadata
  3. Write the daily bars
  4. Store the adjustments

Format the Data

In the load_data_table function, I read the zipfile, grab the file within the zipfile, and then create the dataframe with only the columns we need. I then rename the columns and return the dataframe in the required format.

I read the zipfile using the with statement to ensure that the file is closed after we get the data. I also assert that my zipfile only contains one file; however, be mindful that your zipfile may be setup differently.

def load_data_table(file, show_progress=show_progress):
    # Load data table from Quandl.zip file.
    with ZipFile(file) as zip_file:
        file_names = zip_file.namelist()
        assert len(file_names) == 1, "Expected a single file"
        quandl_prices = file_names.pop()
        with zip_file.open(quandl_prices) as table_file:
            if show_progress:
                log.info('Parsing raw data.')
            data_table = pd.read_csv(table_file,
                                     names=column_names,
                                     index_col=False,
                                     usecols=[
                                         0, 1, 2, 3, 4, 5, 6, 7, 8],
                                     parse_dates=[1])
            data_table.sort_values(['symbol', 'date'],
                                   ascending=[True, True])

    data_table.rename(columns={
        'unadjusted_open': 'open',
        'unadjusted_high': 'high',
        'unadjusted_low': 'low',
        'unadjusted_close': 'close',
        'unadjusted_volume': 'volume',
        'splits': 'split_ratio'
    }, inplace=True, copy=False)

    if show_progress:
        log.info(data_table.info())
        log.info(data_table.head())
    return data_table
 symboldateopenhighlowclosevolumedividendssplits
0A1999-11-1845.5050.0040.0044.00447399000.01.0
1A1999-11-1942.9443.0039.8140.38108971000.01.0

Create the Metadata

The metadata table provides the “mapping table” for our securities list. You can add custom fields here if you would like such as the company name or exchange.

def gen_asset_metadata(data, show_progress):
    if show_progress:
        log.info('Generating asset metadata.')
    data = data.groupby(
        by='symbol'
    ).agg(
        {'date': [np.min, np.max]}
    )
    data.reset_index(inplace=True)
    data['start_date'] = data.date.amin
    data['end_date'] = data.date.amax
    del data['date']
    data.columns = data.columns.get_level_values(0)
    data['auto_close_date'] = data['end_date'].values + pd.Timedelta(days=1)
    return data
sidsymbolstart_dateauto_close_date
0A1999-11-182019-08-13
1AA2016-11-012019-08-13

Write the Daily Bars

Now that we’ve got the metadata written, it’s time to grab a list of our stocks and write the daily bar data. sessions_in_range makes sure our dates are valid exchange trading days. We then call daily_bar_writer with our iterator function parse_pricing_and_vol, which completes the majority of the work.

parse_pricing_and_vol takes a MultiIndex, and I use dataframe.xs to return the appropriate dataframe cross-section. We want to index by our second MultiIndex label, symbol, and return the remaining columns as a dataframe. We then reindex ensuring the data falls within the correct timeframes and call the yield statement to stop the function and return to daily_bar_writer to write the next symbol’s daily data.

symbol_map = asset_metadata.symbol
sessions = calendar.sessions_in_range(start_session, end_session)
raw_data.set_index(['date', 'symbol'], inplace=True)
daily_bar_writer.write(
    parse_pricing_and_vol(
        raw_data,
        sessions,
        symbol_map
    ),
    show_progress=show_progress
)

def parse_pricing_and_vol(data,
                          sessions,
                          symbol_map):
    for asset_id, symbol in iteritems(symbol_map):
        asset_data = data.xs(
            symbol,
            level=1
        ).reindex(
            sessions.tz_localize(None)
        ).fillna(0.0)
        yield asset_id, asset_data

Store the Adjustments

To make sure we’re able to backtest our strategies accurately, we need to provide Zipline with each stock’s dividend and split information.

I store the sid of each symbol in the datafame and pass a filtered dataframe into both parse_splits and parse_dividends. You’ll notice I fill non-essential missing dividend dates with NaT.

raw_data.reset_index(inplace=True)
raw_data['symbol'] = raw_data['symbol'].astype('category')
raw_data['sid'] = raw_data.symbol.cat.codes
adjustment_writer.write(
    splits=parse_splits(
        raw_data[[
                'sid',
                'date',
                'split_ratio',
        ]].loc[raw_data.split_ratio != 1],
    show_progress=show_progress
    ),
    dividends=parse_dividends(
        raw_data[[
            'sid',
            'date',
            'dividends',
        ]].loc[raw_data.dividends != 0],
        show_progress=show_progress
    )
)

def parse_splits(data, show_progress):
    if show_progress:
        log.info('Parsing split data.')
    data['split_ratio'] = 1.0 / data.split_ratio
    data.rename(
        columns={
            'split_ratio': 'ratio',
            'date': 'effective_date',
        },
        inplace=True,
        copy=False,
    )
    return data
sideffective_dateratio
68271997-09-020.5
70581998-08-030.25
def parse_dividends(data, show_progress):
    if show_progress:
        log.info('Parsing dividend data.')
    data['record_date'] = data['declared_date'] = data['pay_date'] = pd.NaT
    return data
sidex_dateamountrecord_datedeclared_datepay_date
31192012-03-300.10NaTNaTNaT
31732012-06-290.10NaTNaTNaT

Register the Bundle

We’ve done all the hard work. Registering the bundle is easy. You’ll need to add two lines of code to your extension.py file. If you’re using Linux or macOS, your zipline folder will be saved in your home directory. For instance, mine is located at /home/leosmigel/.zipline/extension.py.

I saved my bundle file as custom_quandl.py and named my ingest function ingest:

from zipline.data.bundles import register, custom_quandl
register('custom_quandl',custom_quandl.ingest, calendar_name='NYSE')

Ingest the Bundle

Depending on how much data you have, this step can take a while. Make sure you have your zipline environment enabled and run the following command replacing ‘custom_quandl’ with the name of your bundle file:

$ zipline ingest --bundle 'custom_quandl'

That’s it! As a sanity check, you’ll want to make sure your bundle file gives you the same results as the default Quandl bundle. If you need a quick strategy to use, you can use the DMA Strategy and add bundle='custom_quandl' to zipline.run_algorithm.

The Bundle File

Here’s my full bundle file to get your started. Happy hacking!

import pandas as pd
import numpy as np
from pathlib import Path
from six import iteritems
from logbook import Logger
from zipfile import ZipFile
show_progress = True
log = Logger(__name__)
home = str(Path.home()) + "/"
path = home + "your_data_file.zip"
column_names = ["symbol",
                "date",
                "unadjusted_open",
                "unadjusted_high",
                "unadjusted_low",
                "unadjusted_close",
                "unadjusted_volume",
                "dividends", "splits",
                "adjusted_open",
                "adjusted_high",
                "adjusted_low",
                "adjusted_close",
                "adjusted_volume"]

def load_data_table(file, show_progress=show_progress):
    # Load data table from Quandl.zip file.
    with ZipFile(file) as zip_file:
        file_names = zip_file.namelist()
        assert len(file_names) == 1, "Expected a single file"
        quandl_prices = file_names.pop()
        with zip_file.open(quandl_prices) as table_file:
            if show_progress:
                log.info('Parsing raw data.')
            data_table = pd.read_csv(table_file,
                                     names=column_names,
                                     index_col=False,
                                     usecols=[
                                         0, 1, 2, 3, 4, 5, 6, 7, 8],
                                     parse_dates=[1])
            data_table.sort_values(['symbol',
                                    'date'],
                                   ascending=[True, True])
    data_table.rename(columns={
        'unadjusted_open': 'open',
        'unadjusted_high': 'high',
        'unadjusted_low': 'low',
        'unadjusted_close': 'close',
        'unadjusted_volume': 'volume',
        'splits': 'split_ratio'
    }, inplace=True, copy=False)
    if show_progress:
        log.info(data_table.info())
        log.info(data_table.head())
    return data_table

def gen_asset_metadata(data, show_progress):
    if show_progress:
        log.info('Generating asset metadata.')
    data = data.groupby(
        by='symbol'
    ).agg(
        {'date': [np.min, np.max]}
    )
    data.reset_index(inplace=True)
    data['start_date'] = data.date.amin
    data['end_date'] = data.date.amax
    del data['date']
    data.columns = data.columns.get_level_values(0)
    data['auto_close_date'] = data['end_date'].values + pd.Timedelta(days=1)
    if show_progress:
        log.info(data.info())
        log.info(data.head())
    return data

def parse_splits(data, show_progress):
    if show_progress:
        log.info('Parsing split data.')
    data['split_ratio'] = 1.0 / data.split_ratio
    data.rename(
        columns={
            'split_ratio': 'ratio',
            'date': 'effective_date',
        },
        inplace=True,
        copy=False,
    )
    if show_progress:
        log.info(data.info())
        log.info(data.head())
    return data

def parse_dividends(data, show_progress):
    if show_progress:
        log.info('Parsing dividend data.')
    data['record_date'] = data['declared_date'] = data['pay_date'] = pd.NaT
    data.rename(columns={'date': 'ex_date',
                         'dividends': 'amount'}, inplace=True, copy=False)
    if show_progress:
        log.info(data.info())
        log.info(data.head())
    return data

def parse_pricing_and_vol(data,
                          sessions,
                          symbol_map):
    for asset_id, symbol in iteritems(symbol_map):
        asset_data = data.xs(
            symbol,
            level=1
        ).reindex(
            sessions.tz_localize(None)
        ).fillna(0.0)
        yield asset_id, asset_data

def ingest(environ,
           asset_db_writer,
           minute_bar_writer,
           daily_bar_writer,
           adjustment_writer,
           calendar,
           start_session,
           end_session,
           cache,
           show_progress,
           output_dir):
    raw_data = load_data_table(path, show_progress=show_progress)
    asset_metadata = gen_asset_metadata(
        raw_data[['symbol', 'date']],
        show_progress
    )
    asset_db_writer.write(asset_metadata)
    symbol_map = asset_metadata.symbol
    sessions = calendar.sessions_in_range(start_session, end_session)
    raw_data.set_index(['date', 'symbol'], inplace=True)
    daily_bar_writer.write(
        parse_pricing_and_vol(
            raw_data,
            sessions,
            symbol_map
        ),
        show_progress=show_progress
    )
    raw_data.reset_index(inplace=True)
    raw_data['symbol'] = raw_data['symbol'].astype('category')
    raw_data['sid'] = raw_data.symbol.cat.codes
    adjustment_writer.write(
        splits=parse_splits(
            raw_data[[
                'sid',
                'date',
                'split_ratio',
            ]].loc[raw_data.split_ratio != 1],
            show_progress=show_progress
        ),
        dividends=parse_dividends(
            raw_data[[
                'sid',
                'date',
                'dividends',
            ]].loc[raw_data.dividends != 0],
            show_progress=show_progress
        )
    )

Additional Resources

leo

Leo Smigel

Based in Pittsburgh, Analyzing Alpha is a blog by Leo Smigel exploring what works in the markets.