Advanced: prepare your data

Once you have extracted your data, you may need to clean it, transform it so that it fits a certain structure, and augment it with new computed fields. You may also want to merge several domains of data.

In Toucan Toco, all these operations are part of what we call the preprocess step, and can be performed with some lines of code. Before rushing into our homemade data integration module, let us see what we can do with other species of our jungle !

Data integration in Toucan Toco vs. external tools

To use our data integration module, you will need to have a basic knowledge in Python and Pandas.

But maybe you don’t want to be bothered with code. Maybe you don’t know Python. Maybe you prefer another language. Or maybe you’re a lucky user of those awesome ETL tools that already exist on the market. In such a case, do not hesitate to use your own ETL tools. We already have connectors and APIs that allow for a seamless integration with softwares such as SAP, Informatica, Talend or Dataiku.

For complete information, please see dedicated sections about our connectors and our API.

Toucan ETL

Toucan Toco provides an ETL functionality to create or transform your data. It is structured following the classic pattern :

  • Extract : Each datasource is extracted and have a “domain” name,
  • Transform : Domains can be modified by several PIPELINE steps,
  • Load : Choosen domains are loaded in our Data Store.

Declare your pipeline

Inside the etl_config.cson file, declare each transformations in a list called PIPELINE.

Each element of this list is an object that must have the following attributes:

  • input_domains a list of input domains (can be a string if there is only one input domain)
  • function_name the name of the Python function transforming input domains, defined in the augment.py file
  • output_domain the name of the output domain
  • load (optional, boolean) if false then this data will by available as input domain in PIPELINE steps, but will never be actually loaded in the Data Store. Useful for intermediate tables used as temporary data to compute a final result set (e.g. reference tables to be merged with other domains).

The load parameter is also available on items of the DATA_SOURCES section. An input data source might be useful for a transformation (reference tables for example) but may not need to be inserted in the Data Store.

Inside a single pipeline bloc, please be careful not to use the same name for output_domain and any input_domain. For example, if you have a function that transforms a domain, you want to get the transformed domain as output while keeping your input domain unchanged (that you will then be able to use as input domain in any other function):

input_domains: 'domain_1_input'
function_name: 'transform_domain_1'
output_domain: 'domain_1'

This way, you can chain transformation functions, hence the terme pipeline ! For exemple you may chain these two pipeline steps:

  input_domains: 'domain_1_input'
  function_name: 'transform_domain_1'
  output_domain: 'domain_1_tmp'
,
  input_domains: ['domain_1_tmp', 'domain_2']
  function_name: 'merge_domain_1_and_2'
  output_domain: 'domain_merged'

Note how we used domain_1_tmp (ouput from the transform_domain_1 function) as input of the merge_domain_1_and_2 function.

Write your transformations

Transformation functions are in Python and need to be written in the augment.py file.

Before looking at how you can write these functions, let us focus at what tools are available to you to work in Python.

Workflow / Environment

Python and Pandas

ETLs done at Toucan Toco use a Python library called Pandas (http://pandas.pydata.org/) to work with DataFrames.

Some of the most commonly used Pandas functions in processing are:

  • drop_duplicates
  • fillna
  • reset_index
  • merge
  • concat
  • melt
  • pivot_table
  • pivot
  • groupby
  • stack
  • unstack
  • change
  • pct_change
  • map

Toucan Toco SDK

Toucan Toco offers you a toolbox to develop and test your transformations. You can extract your datasources as DataFrames and run the augment.py script on your machine.

You can download our SDK and find related information here.

Examples
  • Get data sources
import getpass
from toucan_data_sdk import ToucanDataSdk

instance_url = 'https://api-demo.toucantoco.com'
auth = ('<username>', getpass.getpass())


sdk = ToucanDataSdk(instance_url, small_app='demo', auth=auth)
dfs = sdk.get_dfs()

This code block import all your datasources in the python dictionnary dfs, with the name of your domains as keys and the corresponding pandas dataframes as values.

If you want to get only some of the datasources (for example domain_1and domain_2), you can replace the last code line with:

dfs = sdk.get_dfs(['domain_1', 'domain_2'])
  • Invalidate cache

    ToucanDataSdk.invalidate_cache()
    

To save you some time, the domain are cached on your machine. If your datasources changed, you will need to invalidate your cache before calling sdk.get_dfs().

  • Get your augment file

    ToucanDataSdk.get_augment()
    

Also, please find example of usage that you can download and run as Jupter Notebooks here (please see below for more details on Jupyter notebooks).

Jupyter

We recommend that you install on your local environment the Jupyter Notebook.

We provide various examples as Jupyter notebooks in our GitHub repositories, for the use of our SDK and for the various tutorials of the present documentation.

It offers a convenient environment to test, write and document code. You can have a look a their documentation here.

You can also use JupyterLab, a web-based user interface to use jupyter notebooks and more.

Transformation functions

The domains are loaded in Python environment as Pandas DataFrame.

In the augment.py file, you will write the functions declared in your PIPELINE. The parameters of a function are DataFrames corresponding to the input_domains. The function will return a DataFrame corresponding to the output_domain

Concretely, for example if you defined the following `PIPELINE``:

input_domains: ['domain_1', 'domain_2']
function_domain: 'merge_domain_1_and_2'
output_domain: 'domain_full'

Then it implies that you have a Python function in the augment.py file with the following signature:

def merge_domain_1_and_2(domain_1:pd.DataFrame, domain_2:pd.DataFrame) -> pd.DataFrame:
    """
    Some transformations...
    You can call serveral smallest functions in here - a good principle being to
    split your code into  several small functions.
    Do not forget that you can use our utility functions in here - cf. dedicated
    chapter thereafter
    """

    domain_full = merge_function(domain_1, domain_2)
    domain_full = some_tranformation_function(domain_full, some_parameter_1, some_parameter_2)

    return domain_full

Example of Toucan ETL

Tutorial : Product Corporation

You can download the CSV files to follow along with our tutorial that will illustrate the documentation of this page.

We want to define two input domains corresponding to the two CSV files (cf. at the top of that page), and then merge the tables to get information from the second domain into the the first one.

Before joining the tables, we first have to clean the mapping brands names.

Here is the augment.py code in which we define two functions, one to clean the mapping dataframe, and the other to join it to the main data dataframe:

def clean_mapping(mapping):
   # Replace brand names
   to_replace = {'Fake Lux.': "Fake Luxury",
                 'Golden Fragrances': 'Golden Fragrance',
                 'Luxury Cpy': 'Luxury Company'
                 }
   mapping['brand'] = mapping.brand.replace(to_replace)

   # Convert brand names to uppercase
   mapping['brand'] = mapping.brand.str.upper()

   return mapping


def merge_data_mapping(data, mapping):
   data = data.merge(mapping, on='brand', how='left')
   return data

And here is the etl_config.cson code which defines the input domains and the pipeline:

DATA_SOURCES: [
 domain: 'data_input'
 type: 'csv'
 file: 'data-product-corporation.csv'
 load: false
,
 domain: 'mapping_input'
 type: 'csv'
 file: 'mapping_brands_class.csv'
 load: false
]
PIPELINE: [
 input_domains: ['mapping_input']
 function_name: 'clean_mapping'
 output_domain: 'mapping_clean'
 load: false
,
 input_domains: ['data_input', 'mapping_clean']
 function_name: 'merge_data_mapping'
 output_domain: 'data'
]

Note how we specified load: false for domains not needed in our app (the raw input domains as well as the temporary mapping_clean domain that is only used for merging purpose in the merge_data_mapping function).

Also note that we used different names between input and ouput domains, and that we used the ouuput of the clean_mapping function (i.e. mapping_clean) as input of the merge_data_mapping function.

Utility functions

We provide some utility functions to transform your data. These utility functions can be found inside the utils/generic folder in toucan-data-sdk:

Tips

In case of inconsistent result, check the index of the dataframe with df.index.

It has to start with 0 and has to contain successive numbers.

add_missing_row

This function adds missing rows based on reference values.

from toucan_data_sdk.utils.generic import add_missing_row

The signature of the add_missing_row function is:

add_missing_row(df, id_cols, reference_col, complete_index=None, method=None, cols_to_keep=None)
  • df (pd.DataFrame): initial dataframe.
  • id_cols (list): name of the columns used to create each group..
  • reference_col (str): name of the column used to identify missing rows.
  • complete_index (list/pd.Series/dict) optional: a set of values used to add missing rows or a dict to declare a date range, by default use unique on reference_col.
  • method (str) optional : the name of the method to choose which missing rows to add.
    • None, add all missing rows,
    • between, add missing rows having their value between min and max values for each group,
    • between_and_after, add missing rows having their value bigger than min value for each group,
    • between_and_before, add missing rows having their value smaller than max values for each group,
  • cols_to_keep (list(str)) optional: name of other columns to keep, linked to the reference_col.

Example

  • With all mandatory parameters
  • With complete_index
  • With complete_index and method="between"
  • With complete_index and method="between_and_after"
  • With complete_index and method="between_and_before"
  • With cols_to_keep
  • With complete_index as date range : ```python complete_index = { ‘type’ : ‘date’, ‘format’: ‘%Y-%m-%d’, ‘freq’ :{ ‘days’: 1, }, ‘start’: ‘2016-01-01’, ‘end’: ‘2016-01-04’ }

```

clean_dataframe

This function is used to clean a dataframe. It can :

  • slugify the column names
  • convert columns type to ‘category’ (if len(unique) < threshold) or ‘int’
  • clean the dataframe and rename if necessary
from toucan_data_sdk.utils.generic import clean_dataframe

The signature of the clean_dataframe function is:

clean_dataframe(df, slugify=True, threshold=50, rename_cols=None)
  • df (pd.DataFrame): initial dataframe.
  • slugify (boolean) optional: By default, True. Activate or deactivate the cleaning of column name (slugify).
  • treshold (int) optional: maximal number of categories in a column (to identify type),
  • rename_cols (dict) optional: rename dict option for columns.

Example

slugify = True
threshold = 2
rename_cols =   {
    'age' : 'Age'
}

combine_columns_aggregation

This function aggregates data based on all possible combinations of values found in specified columns.

from toucan_data_sdk.utils.generic import combine_columns_aggregation

The signature of the combine_columns_aggregation function is:

combine_columns_aggregation(df, id_cols, cols_for_combination, agg_func='sum')
  • df (pd.DataFrame): initial dataframe.
  • id_cols (list(str)): name of the columns used to create each group.
  • cols_for_combination (dict): name of the input columns for combination as key and their output label as value.
  • agg_func optional (callable/string/dict/list): Use for aggregating the data. By default sum.

Example

  • With all required parameters:

    id_cols = ['Year']
    cols_for_combination = {
    'City': 'All Cities',
    'Category': 'All'
    }
    

compute_cumsum

This function gives a cumsum by group.

from toucan_data_sdk.utils.generic import compute_cumsum

The signature of the compute_cumsum function is:

compute_cumsum(df, id_cols, reference_cols, value_cols, new_value_cols=None, cols_to_keep=None)
  • df (pd.DataFrame): initial dataframe.
  • id_cols (list): name of the columns used to create each group..
  • reference_cols (list): name of the columns used to order the cumsum.
  • value_cols (list): name of the columns to cumsum.
  • new_value_cols (list): name of th new columns with the cumsum.
  • cols_to_keep (list) optional: other columns to keep in the dataframe. This option can be used if there is only one row by group identify with the columns id_cols and reference_cols

Tips

For an aggregation YearToDate, the column DATE is the reference column and the column YEAR is an id column.

Example

  • With cols_to_keep
  • With new_value_cols

compute_evolution

The function compute_evolution is deprecated since v21. Use compute_evolution_by_frequency instead.

compute_evolution_by_criteria

This function answers the question: how has a value changed compare to a specific value ?

from toucan_data_sdk.utils.generic import compute_evolution_by_criteria

The signature of the compute_evolution_by_criteria function is:

compute_evolution_by_criteria(
  df,
  id_cols,
  value_col,
  compare_to,
  method='abs',
  format='column',
  offseted_suffix='_offseted',
  evolution_col_name='evolution_computed',
  raise_duplicate_error=True
)
  • df (pd.DataFrame): initial dataframe.
  • id_cols (list(str)): name of the columns used to create each group..
  • value_col (str): name of the column containing the value to compare
  • compare_to (str): the query identifying a specific set of values for comparison.
  • method (str) optional: either abs for aboslute values or pct for the evolution in percentage of previous value
  • format (str) optional: default column to only return the column evolution. Can be also df to return the entire df with the column offseted and the column evolution. Use df for postprocess.
  • offseted_suffix (str) optional: suffix of the offseted column. By default, _offseted.
  • evolution_col_name (str) optional: name given to the evolution column. By default, evolution_computed.
  • raise_duplicate_error (bool) optional: raise an error - if set to True - or warning - if set to False- when the dataframe has duplicated values with the given id_cols.

Example

How has a value changed compare to december ?

compute_evolution_by_frequency

This function answers the question: how has a value changed on a weekly, monthly, yearly basis ?

from toucan_data_sdk.utils.generic import compute_evolution_by_frequency

The signature of the compute_evolution_by_frequency function is:

compute_evolution_by_frequency(
  df,
  id_cols,
  date_col,
  value_col,
  freq=1,
  method='abs',
  format='column',
  offseted_suffix='_offseted',
  evolution_col_name='evolution_computed',
  missing_date_as_zero=False,
  raise_duplicate_error=True
)
  • df (pd.DataFrame): initial dataframe.

  • id_cols (list(str)): name of the columns used to create each group.

  • date_col (str): name of the column containing the date.

  • value_col (str): name of the column containing the value to compare

  • freq (int/pd.DateOffset/pd.Serie/dict): the frequency at which we calculate evolutions (anyhting that can be added to the pd.Serie present in date_col). By default, 1. Can be a dict containing the frequency like:

    freq = {
    years: 1,
    days: 1
    months: 1
    }
    
  • method (str) optional: either abs for aboslute values or pct for the evolution in percentage of previous value

  • format (str) optional: default column to only return the column evolution. Can be also df to return the entire df with the column offseted and the column evolution. Use df for postprocess.

  • offseted_suffix (str) optional: suffix of the offseted column. By default, _offseted.

  • evolution_col_name (str) optional: name given to the evolution column. By default, evolution_computed.

  • missing_date_as_zero (boolean) optional: add missing date with zero value.

  • raise_duplicate_error (bool) optional: raise an error - if set to True - or a warning - if set to False- when the dataframe has duplicated values with the given id_cols.

Tips

When you use missing_date_as_zero=False and format='df', since both the original dataframe index and the evoluiton serie index are matching, the result of the function can be add as a new column on the original dataframe.

Example

  • With all mandatory parameters
  • With freq as dict
  • With method="pct"
  • With missing_date_as_zero=True

compute_ffill_by_group

This function gives the result of a groupby with a ffill without the performance issue. The method ffill propagate last valid value forward to next values.

from toucan_data_sdk.utils.generic import compute_ffill_by_group

The signature of the compute_ffill_by_group function is:

compute_ffill_by_group(df, id_cols, reference_cols, value_col)
  • df (pd.DataFrame): initial dataframe.
  • id_cols (list(str)): name of the columns used to create each group.
  • reference_cols (list(str)): name of the columns used to sort.
  • value_col (str): name of the column to fill.

Example

roll_up

This function creates aggregates following a given hierarchy.

from toucan_data_sdk.utils.generic import roll_up

The signature of the roll_up function is:

roll_up(df, levels, groupby_vars, extra_groupby_cols=[], var_name='type', value_name='value', agg_func='sum', drop_levels=None)
  • df (pd.DataFrame): initial dataframe.
  • levels (list(str)): name of the columns composing the hierarchy (from the top to the bottom level).
  • groupby_vars (list(str)): name of the columns with value to aggregate.
  • extra_groupby_cols (list(str)) optional: other columns used to group in each level.
  • var_name (str) optional: name of the result variable column. By default, “type”.
  • value_name (str) optional: name of the result value column. By default, “value”.
  • agg_func (str) optional: name of the aggregation operation. By default, “sum”.
  • drop_levels (list(str)) optional: the names of the levels that you may want to discard from the output

Example

  • With all mandatory parameters
  • With extra_groupby_cols
  • With agg_func='max'

two_values_melt

This function transforms one or multiple columns into rows.

Unlike melt function (pandas), two value columns can be returned by the function. For example, an evolution column and a price column.

from toucan_data_sdk.utils.generic import two_values_melt

The signature of the two_values_melt function is:

two_values_melt(df, first_value_vars, second_value_vars, var_name, value_name)
  • df (DataFrame): initial dataframe.
  • first_value_vars (list(str)): name of the columns corresponding to the first returned value column
  • second_value_vars (list(str): name of the columns corresponding to the second returned value column
  • var_name (str): name of the column containing values in first_value_vars
  • value_name (str): suffix of the two value columns (suffix_first / suffix_second)

Example

Validate your transformations

A general good principle to be able to monitor your code efficiently is to structure your augment.py file using short functions transforming only a limited subset of the input dataframes.

This approach will allow you to take advantage of custom functions designed to make augment.py files faster, easier to develop, test, maintain and refactor.

Preprocess validation

Add some data validation steps to quickly detect errors, to check that what has been written in the database after the preprocess step abides by some rules that you defined a priori. Preprocess validation is described inside a file under the preprocess directory of your small app. This file must be named: preprocess_validation.cson.

You can also drop it via the studio in the modify configuration files page.

File structure

The top key of the file must be named rules. It must be an array (containing validation objects between brackets [], separated by commas). Each element of this array is an object with two keys: query and validation. Both keys are required.

The query object is the query that has to be run to get the data to validate. The validation object is a list of validation rules for the query result. (see below for the validation rules format)

Example
rules: [
  {
    query:
      domain: 'dashboards'
      country: 'United States'
    validation: [
      {
        type: 'rows'
        expected: 6
      },
      {
        type: 'columns'
        expected: ['metric', 'target']
      }
    ]
  },
  {
    query:
      domain: 'dashboards'
    validation: [
      {
        type: 'unique_values'
        expected: ['France', 'United States']
        params:
          column: 'country'
      },
      {
        type: 'no_duplicates'
        params:
          columns: ['metric']
      },
      {
        type: 'no_duplicates'
        params:
          columns: 'all'
      },
      {
        type: 'columns'
        expected: ['country', 'zone', 'metric', 'default', 'target']
      }
    ]
  },
  {
    query:
      domain: 'dashboards'
      country: 'United States'
      metric: 10
    validation: [
      {
        type: 'value'
        expected: 30
        params:
          column: 'target'
      }
    ]
  }
]

Assertions

You can use assertions to check important quality features of your data after applying some transformation in a function. A good principle is to make assertions in your short functions to test that they behave as expected.

For example, you may want to check that there is no null values in a column before return a dataframe. Here is a way of doing it:

def some_function(df_input):
  ...
  #some transformations
  ...
  assert df_output.column_to_check.notnull().all(), "Some error message of your choice"
  return

If the assertion is verified, the script will keep running; if not, an assertion error will be raise with a custom message of your choice.

Debugging tools

pdb / ipdb

If your work from your local environment, you can use the pdb python module, or the recommended ipdb module if you use Jupyter IPython (compared to pdb, it offers features such as tab completion, syntax highlighting, better tracebacks, easy multi-line statements and more).

You can find a detail documentation here. The way of using ipdb and pdb is very similar for the most basic and frequent operations, which should be enough in most cases.

Actually, in general you will only need to set a break point in your code, so that when the script is executed it will be paused at this break point; you will have access to a debug console so that you can explore your variables to better understand what is happening at this precise moment of the execution.

The typical usage to break into the debugger from a running program is to insert:

import pdb; pdb.set_trace()

Please note that you can only use these module on your local environment. The augment.py cannot include pdb statements on our servers.

Logging

You can use logging as a way to display information in the console about the script execution.

You will need to create a logger object this way:

import logging

logger = logging.getLogger(__name__)

You can display a log message in the console like this:

logger.info('A message')

You can log variables values (including dataframes) like this:

logger.info(f'My variable value: { my_variable }')

You can find more details in the logging documentation here

Additionaly, the Toucan Toco SDK provides you with decorators available to help with logging. Note that they all need a logger object as parameter.

catch

This decorator allows to catch an exception and don’t raise it. Logs information if a decorator failed.

Usage:

@catch(logger)
def some_function():
    return

log_message

This decorator to log a message after executing a function

Usage:

@log_message(logger, "my message")
def some_function():
    return

log_time

This decorator allopws to log the execution time of a function.

Usage:

@log_time(logger)
def some_function():
    return

log_shapes

This decorator allows to log the shapes of input and output dataframes.

Usage:

@log_shapes(logger)
def some_function():
    return

Partial ETL

You can launch your preprocess on a part of your data :

  • From Data Source interface : update domains available in the Data Store and impacted by a selected datasource.
  • From the Data Store interface: Update a selected domain available in the Data Store.

Preprocess notifications

When logging is not enough and you want to notify users that some event happens while preprocessing data, we provide a signal that can be emitted inside an augment function to trigger notifications.

This signal is called smallapp.from_augment. This how to emit the signal:

from app.signal_manager.signals import signal_from_augment

def some_function():
  ...
  signal_from_augment.send(...)
  ...
  return

And here is an example configuration in the etl_config.cson :

NOTIFICATIONS: [
  {
    event: 'smallapp.from_augment'
    sender: [
      {
        user_groups: ['admin']
        channel: 'email'
        template: '...'
      }
    ]
  }
]

For more details on notifications, please our dedicated section here A complete working example is also provided in the demo application of every Toucan Toco instance.