Data Processing with Steps and Pipelines
Your primary tool for creating ETL mappings and automating your Incent processes is using step and pipeline domain objects. You can bundle a series of steps in a pipeline. Pipelines can be embedded within other pipelines.
Steps allow you to store xCL and xSQL expressions on the Connect service for later use. Pipelines bundle a series of steps and child pipelines into a single logical unit, and are also stored on the service. When a pipeline is invoked – manually or by a schedule domain object – each pipeline member is executed in sequence on the Connect server.
Together, steps and pipelines constitute the building blocks of your ETL processes within Connect. For example, you might create an ETL pipeline to load a raw data file into Incent and email any data validation errors to a distribution list.
Now, we’ll create some steps and then store them in a pipeline.
Steps
First, let’s create some steps that retrieve payment data for a specific time period.
1. Create a step that finds the “monthly” period type.
create step s_get_month_period_type as (set v_month_pt_name *= select name from xactly.xc_period_type where Uppercase(name) like 'MON%');
Locate a status_message of “ok” in the output to confirm your command succeeded.
2. Create a step to set the last day of the monthly period to run.
create step s_set_period_end_date as (set v_period_end_date *= ToDate('2016-01-31'));
Again, confirm that your command succeeded in the command output.
3. Create a step that determines the period name.
create step s_set_period_name as (set v_period_name *= LookupPeriodName(:v_period_end_date,:v_month_pt_name));
Note that we used the variables created in the first two steps in this step’s definition.
4. Create a step to insert payment data into a custom delta table.
create step s_create_pay_extract as (insert into Delta(TableName='delta.payfile_extract', Overwrite=true) select part.first_name, eff_participant_id, period_name, order_code, item_code, release_date, is_final,amount, ut.name from xactly.xc_payment pay join xactly.xc_unit_type ut on ut.unit_type_id = pay.amount_unit_type_id join xactly.xc_participant part on part.participant_id = pay.eff_participant_id where pay.period_name = :v_period_name);
Note that the WHERE clause above references the v_period_name variable we set in step 3.
Show Steps
Let’s review the steps we created with the SHOW command.
show steps;
Invoke Steps
Now we’ll invoke the steps we just created.
IMPORTANT: Order of step invocation is crucial, as some steps use values set in previous steps. Omit the keyword “synchronous” to invoke a step or pipeline asynchronously on the Connect server. We use synchronous in the examples to ensure a command completes before returning control back to the command prompt.
1. Invoke Step 1.
invoke synchronous step s_get_month_period_type;
2. Invoke Step 2.
invoke synchronous step s_set_period_end_date;
3. Invoke Step 3.
invoke synchronous step s_set_period_name;
4. Invoke Step 4.
invoke synchronous step s_create_pay_extract;
Again, after each command, locate a status_message of “ok” in the output to confirm your command succeeded.
Show Invocations
Use SHOW INVOCATIONS to review the status of the steps and pipelines that have been executed.
select id, object_name, state, status, completed_instant from (show invocations) order by completed_instant;
Alter Step
Use ALTER STEP to modify the command expression in an existing step.
Let’s change Step 1 to call Lowercase() instead of Uppercase().
alter step s_get_month_period_type as (set v_month_pt_name *= select name from xactly.xc_period_type where Lowercase(name) like 'mon%');
Pipelines
Use pipelines to organize your steps into a single logical procedure. Pipelines are stored on the Connect server indefinitely unless a user issues a DROP PIPELINE command. A pipeline can contain steps and child pipelines. When invoked, the steps and child pipelines execute in sequence.
Create a Pipeline
In this example, we will create a new pipeline and then add the steps we created above.
create pipeline if not exists p_pay_extract;
Add Steps to a Pipeline
Use ALTER PIPELINE to add steps to a pipeline.
1. Add Step 1.
alter pipeline p_pay_extract add step s_get_month_period_type;
2. Add Step 2.
alter pipeline p_pay_extract add step s_set_period_end_date;
3. Add Step 3.
alter pipeline p_pay_extract add step s_set_period_name;
4. Add Step 4.
alter pipeline p_pay_extract add step s_create_pay_extract;
Show Pipeline Members
Review which steps are in our pipeline now, and in what order.
show pipeline p_pay_extract members;
Invoke Pipeline
It’s time to invoke our pipeline.
invoke synchronous pipeline p_pay_extract;
Show and Query Invocations
Use SHOW INVOCATIONS to review details for the executed pipeline.
select id, state, status, object_type, object_name, created_instant, completed_instant from (show invocations) where object_name = 'p_pay_extract';
Now, you can query SHOW INVOCATION DETAILS using the id from the previous query.
select invocation_id, state, status, position, object_type, object_name, parent_name, completed_instant from (show invocation details) where invocation_id = 'ff8080815127d14e01515ab86f390aa2';
Add Conditions to Pipeline Members
In some cases, you don’t want a pipeline to continue executing if a particular condition is not met. For example, if your pipeline loads a CSV file into Connect’s staging tables, the pipeline should abort if the CSV file is not found.
Add a conditional step to your pipeline. In this case, if the Condition evaluates to false, invoke the step in OnConditionFalse.
alter pipeline if exists p_load_orders add step s_init_vars as (Condition=(Exists(FilePath='raw_orders.csv')), Retries=2, RetryInterval=10, OnConditionFalse=s_send_error_email, AbortOnConditionFalse=true);
In our example:
- If the raw_orders.csv file is not found, the s_init_vars step will not execute. Instead, the step in the OnConditionFalse parameter sends an error email.
- The Retries parameter tells Connect to retry the step 2 times, while RetryInterval tells it to wait 10 minutes between tries.