Data Processing with Steps and Pipelines

Your primary tool for creating ETL mappings and automating your Incent processes is using step and pipeline domain objects. You can bundle a series of steps in a pipeline. Pipelines can be embedded within other pipelines.

Steps allow you to store xCL and xSQL expressions on the Connect service for later use. Pipelines bundle a series of steps and child pipelines into a single logical unit, and are also stored on the service. When a pipeline is invoked – manually or by a schedule domain object – each pipeline member is executed in sequence on the Connect server.

Together, steps and pipelines constitute the building blocks of your ETL processes within Connect. For example, you might create an ETL pipeline to load a raw data file into Incent and email any data validation errors to a distribution list.

Now, we’ll create some steps and then store them in a pipeline.

Steps

First, let’s create some steps that retrieve payment data for a specific time period.

1. Create a step that finds the “monthly” period type.

create step s_get_month_period_type as 
(set v_month_pt_name *= select name 
from xactly.xc_period_type 
where Uppercase(name) 
like 'MON%');

Screen Shot 2016-07-18 at 1.28.49 PM

Locate a status_message of “ok” in the output to confirm your command succeeded.

 

2. Create a step to set the last day of the monthly period to run.

create step s_set_period_end_date as 
(set v_period_end_date *= ToDate('2016-01-31'));

Again, confirm that your command succeeded in the command output.

 

3. Create a step that determines the period name.

create step s_set_period_name as 
(set v_period_name *= LookupPeriodName(:v_period_end_date,:v_month_pt_name));

Note that we used the variables created in the first two steps in this step’s definition.

4. Create a step to insert payment data into a custom delta table.

create step s_create_pay_extract as 
(insert into Delta(TableName='delta.payfile_extract', Overwrite=true)
select part.first_name, eff_participant_id, 
period_name, order_code, 
item_code, release_date, 
is_final,amount, ut.name
from xactly.xc_payment pay
join xactly.xc_unit_type ut on ut.unit_type_id = pay.amount_unit_type_id
join xactly.xc_participant part on part.participant_id = pay.eff_participant_id 
where pay.period_name = :v_period_name);

Note that the WHERE clause above references the v_period_name variable we set in step 3.

Show Steps

Let’s review the steps we created with the SHOW command.

show steps;

Screen Shot 2016-07-18 at 1.29.46 PM

Invoke Steps

Now we’ll invoke the steps we just created.

IMPORTANT: Order of step invocation is crucial, as some steps use values set in previous steps. Omit the keyword “synchronous” to invoke a step or pipeline asynchronously on the Connect server. We use synchronous in the examples to ensure a command completes before returning control back to the command prompt.

1. Invoke Step 1.
invoke synchronous step s_get_month_period_type;
invoke_step
2. Invoke Step 2.
invoke synchronous step s_set_period_end_date;
3. Invoke Step 3.
invoke synchronous step s_set_period_name;
4. Invoke Step 4.
invoke synchronous step s_create_pay_extract;

Again, after each command, locate a status_message of “ok” in the output to confirm your command succeeded.

Screen Shot 2016-07-18 at 1.40.23 PM

Show Invocations

Use SHOW INVOCATIONS to review the status of the steps and pipelines that have been executed.

select id, object_name, 
state, status, 
completed_instant 
from (show invocations) 
order by completed_instant;

Screen Shot 2016-07-18 at 1.41.38 PM

Alter Step

Use ALTER STEP to modify the command expression in an existing step.

Let’s change Step 1 to call Lowercase() instead of Uppercase().

alter step s_get_month_period_type as 
(set v_month_pt_name *= select name 
from xactly.xc_period_type 
where Lowercase(name) like 'mon%');

alter_step

Pipelines

Use pipelines to organize your steps into a single logical procedure. Pipelines are stored on the Connect server indefinitely unless a user issues a DROP PIPELINE command. A pipeline can contain steps and child pipelines. When invoked, the steps and child pipelines execute in sequence.

Create a Pipeline

In this example, we will create a new pipeline and then add the steps we created above.

create pipeline if not exists p_pay_extract;

create_pipeline

Add Steps to a Pipeline

Use ALTER PIPELINE to add steps to a pipeline.

1. Add Step 1.
alter pipeline p_pay_extract add step s_get_month_period_type;
alter_pipeline
2. Add Step 2.
alter pipeline p_pay_extract add step s_set_period_end_date;
3. Add Step 3.
alter pipeline p_pay_extract add step s_set_period_name;
4. Add Step 4.
alter pipeline p_pay_extract add step s_create_pay_extract;

Show Pipeline Members

Review which steps are in our pipeline now, and in what order.

show pipeline p_pay_extract members;

Screen Shot 2016-07-18 at 1.47.00 PM

 

Invoke Pipeline

It’s time to invoke our pipeline.

invoke synchronous pipeline p_pay_extract;

 

Show and Query Invocations

Use SHOW INVOCATIONS to review details for the executed pipeline.

select id, state, 
status, object_type, 
object_name, created_instant, 
completed_instant 
from (show invocations) 
where object_name = 'p_pay_extract';

Screen Shot 2016-07-18 at 1.47.51 PM

Now, you can query SHOW INVOCATION DETAILS using the id from the previous query.

select invocation_id, state, 
status, position, 
object_type, object_name, 
parent_name, completed_instant 
from (show invocation details) 
where invocation_id = 'ff8080815127d14e01515ab86f390aa2';

Screen Shot 2016-07-18 at 1.48.24 PM

Add Conditions to Pipeline Members

In some cases, you don’t want a pipeline to continue executing if a particular condition is not met. For example, if your pipeline loads a CSV file into Connect’s staging tables, the pipeline should abort if the CSV file is not found.

Add a conditional step to your pipeline. In this case, if the Condition evaluates to false, invoke the step in OnConditionFalse.

 

alter pipeline if exists p_load_orders
add step s_init_vars
as (Condition=(Exists(FilePath='raw_orders.csv')),
Retries=2,
RetryInterval=10,
OnConditionFalse=s_send_error_email,
AbortOnConditionFalse=true);

In our example:

  • If the raw_orders.csv file is not found, the s_init_vars step will not execute. Instead, the step in the OnConditionFalse parameter sends an error email.
  • The Retries parameter tells Connect to retry the step 2 times, while RetryInterval tells it to wait 10 minutes between tries.