Pipelines and Activities in Azure Data Factory
Note
This article applies to version 1 of Data Factory. If you are using the current version of the Data Factory service, see Pipelines in V2.
This article helps you understand pipelines and activities in Azure Data Factory and use them to construct end-to-end data-driven workflows for your data movement and data processing scenarios.
Note
This article assumes that you have gone through Introduction to Azure Data Factory. If you do not have hands-on-experience with creating data factories, going through data transformation tutorial and/or data movement tutorial would help you understand this article better.
Note
We recommend that you use the Azure Az PowerShell module to interact with Azure. See Install Azure PowerShell to get started. To learn how to migrate to the Az PowerShell module, see Migrate Azure PowerShell from AzureRM to Az.
Overview
A data factory can have one or more pipelines. A pipeline is a logical grouping of activities that together perform a task. The activities in a pipeline define actions to perform on your data. For example, you may use a copy activity to copy data from a SQL Server database to an Azure Blob Storage. Then, use a Hive activity that runs a Hive script on an Azure HDInsight cluster to process/transform data from the blob storage to produce output data. Finally, use a second copy activity to copy the output data to an Azure Synapse Analytics on top of which business intelligence (BI) reporting solutions are built.
An activity can take zero or more input datasets and produce one or more output datasets. The following diagram shows the relationship between pipeline, activity, and dataset in Data Factory:
A pipeline allows you to manage activities as a set instead of each one individually. For example, you can deploy, schedule, suspend, and resume a pipeline, instead of dealing with activities in the pipeline independently.
Data Factory supports two types of activities: data movement activities and data transformation activities. Each activity can have zero or more input datasets and produce one or more output datasets.
An input dataset represents the input for an activity in the pipeline and an output dataset represents the output for the activity. Datasets identify data within different data stores, such as tables, files, folders, and documents. After you create a dataset, you can use it with activities in a pipeline. For example, a dataset can be an input/output dataset of a Copy Activity or an HDInsightHive Activity. For more information about datasets, see Datasets in Azure Data Factory article.
Data movement activities
Copy Activity in Data Factory copies data from a source data store to a sink data store. Data Factory supports the following data stores. Data from any source can be written to any sink. Click a data store to learn how to copy data to and from that store.
Category | Data store | Supported as a source | Supported as a sink |
---|---|---|---|
Azure | Azure Blob storage | ✓ | ✓ |
Azure Cosmos DB for NoSQL | ✓ | ✓ | |
Azure Data Lake Storage Gen1 | ✓ | ✓ | |
Azure SQL Database | ✓ | ✓ | |
Azure Synapse Analytics | ✓ | ✓ | |
Azure Cognitive Search Index | ✓ | ||
Azure Table storage | ✓ | ✓ | |
Databases | Amazon Redshift | ✓ | |
DB2* | ✓ | ||
MySQL* | ✓ | ||
Oracle* | ✓ | ✓ | |
PostgreSQL* | ✓ | ||
SAP Business Warehouse* | ✓ | ||
SAP HANA* | ✓ | ||
SQL Server* | ✓ | ✓ | |
Sybase* | ✓ | ||
Teradata* | ✓ | ||
NoSQL | Cassandra* | ✓ | |
MongoDB* | ✓ | ||
File | Amazon S3 | ✓ | |
File System* | ✓ | ✓ | |
FTP | ✓ | ||
HDFS* | ✓ | ||
SFTP | ✓ | ||
Others | Generic HTTP | ✓ | |
Generic OData | ✓ | ||
Generic ODBC* | ✓ | ||
Salesforce | ✓ | ||
Web Table (table from HTML) | ✓ |
Note
Data stores with * can be on-premises or on Azure IaaS, and require you to install Data Management Gateway on an on-premises/Azure IaaS machine.
For more information, see Data Movement Activities article.
Data transformation activities
Azure Data Factory supports the following transformation activities that can be added to pipelines either individually or chained with another activity.
Data transformation activity | Compute environment |
---|---|
Hive | HDInsight [Hadoop] |
Pig | HDInsight [Hadoop] |
MapReduce | HDInsight [Hadoop] |
Hadoop Streaming | HDInsight [Hadoop] |
Spark | HDInsight [Hadoop] |
ML Studio (classic) activities: Batch Execution and Update Resource | Azure VM |
Stored Procedure | Azure SQL, Azure Synapse Analytics, or SQL Server |
Data Lake Analytics U-SQL | Azure Data Lake Analytics |
DotNet | HDInsight [Hadoop] or Azure Batch |
Note
You can use MapReduce activity to run Spark programs on your HDInsight Spark cluster. See Invoke Spark programs from Azure Data Factory for details. You can create a custom activity to run R scripts on your HDInsight cluster with R installed. See Run R Script using Azure Data Factory.
For more information, see Data Transformation Activities article.
Custom .NET activities
If you need to move data to/from a data store that the Copy Activity doesn't support, or transform data using your own logic, create a custom .NET activity. For details on creating and using a custom activity, see Use custom activities in an Azure Data Factory pipeline.
Schedule pipelines
A pipeline is active only between its start time and end time. It is not executed before the start time or after the end time. If the pipeline is paused, it does not get executed irrespective of its start and end time. For a pipeline to run, it should not be paused. See Scheduling and Execution to understand how scheduling and execution works in Azure Data Factory.
Pipeline JSON
Let us take a closer look on how a pipeline is defined in JSON format. The generic structure for a pipeline looks as follows:
{
"name": "PipelineName",
"properties":
{
"description" : "pipeline description",
"activities":
[
],
"start": "<start date-time>",
"end": "<end date-time>",
"isPaused": true/false,
"pipelineMode": "scheduled/onetime",
"expirationTime": "15.00:00:00",
"datasets":
[
]
}
}
Tag | Description | Required |
---|---|---|
name | Name of the pipeline. Specify a name that represents the action that the pipeline performs.
|
Yes |
description | Specify the text describing what the pipeline is used for. | Yes |
activities | The activities section can have one or more activities defined within it. See the next section for details about the activities JSON element. | Yes |
start | Start date-time for the pipeline. Must be in ISO format. For example: 2016-10-14T16:32:41Z . It is possible to specify a local time, for example an EST time. Here is an example: 2016-02-27T06:00:00-05:00 ", which is 6 AM EST.The start and end properties together specify active period for the pipeline. Output slices are only produced with in this active period. |
No If you specify a value for the end property, you must specify value for the start property. The start and end times can both be empty to create a pipeline. You must specify both values to set an active period for the pipeline to run. If you do not specify start and end times when creating a pipeline, you can set them using the Set-AzDataFactoryPipelineActivePeriod cmdlet later. |
end | End date-time for the pipeline. If specified must be in ISO format. For example: 2016-10-14T17:32:41Z It is possible to specify a local time, for example an EST time. Here is an example: 2016-02-27T06:00:00-05:00 , which is 6 AM EST.To run the pipeline indefinitely, specify 9999-09-09 as the value for the end property. A pipeline is active only between its start time and end time. It is not executed before the start time or after the end time. If the pipeline is paused, it does not get executed irrespective of its start and end time. For a pipeline to run, it should not be paused. See Scheduling and Execution to understand how scheduling and execution works in Azure Data Factory. |
No If you specify a value for the start property, you must specify value for the end property. See notes for the start property. |
isPaused | If set to true, the pipeline does not run. It's in the paused state. Default value = false. You can use this property to enable or disable a pipeline. | No |
pipelineMode | The method for scheduling runs for the pipeline. Allowed values are: scheduled (default), onetime. ‘Scheduled’ indicates that the pipeline runs at a specified time interval according to its active period (start and end time). ‘Onetime’ indicates that the pipeline runs only once. Onetime pipelines once created cannot be modified/updated currently. See Onetime pipeline for details about onetime setting. |
No |
expirationTime | Duration of time after creation for which the one-time pipeline is valid and should remain provisioned. If it does not have any active, failed, or pending runs, the pipeline is automatically deleted once it reaches the expiration time. The default value: "expirationTime": "3.00:00:00" |
No |
datasets | List of datasets to be used by activities defined in the pipeline. This property can be used to define datasets that are specific to this pipeline and not defined within the data factory. Datasets defined within this pipeline can only be used by this pipeline and cannot be shared. See Scoped datasets for details. | No |
Activity JSON
The activities section can have one or more activities defined within it. Each activity has the following top-level structure:
{
"name": "ActivityName",
"description": "description",
"type": "<ActivityType>",
"inputs": "[]",
"outputs": "[]",
"linkedServiceName": "MyLinkedService",
"typeProperties":
{
},
"policy":
{
},
"scheduler":
{
}
}
Following table describes properties in the activity JSON definition:
Tag | Description | Required |
---|---|---|
name | Name of the activity. Specify a name that represents the action that the activity performs.
|
Yes |
description | Text describing what the activity or is used for | Yes |
type | Type of the activity. See the Data Movement Activities and Data Transformation Activities sections for different types of activities. | Yes |
inputs | Input tables used by the activity// one input table "inputs": [ { "name": "inputtable1" } ], // two input tables "inputs": [ { "name": "inputtable1" }, { "name": "inputtable2" } ], |
Yes |
outputs | Output tables used by the activity.// one output table "outputs": [ { "name": "outputtable1" } ], //two output tables "outputs": [ { "name": "outputtable1" }, { "name": "outputtable2" } ], |
Yes |
linkedServiceName | Name of the linked service used by the activity. An activity may require that you specify the linked service that links to the required compute environment. |
Yes for HDInsight Activity and ML Studio (classic) Batch Scoring Activity No for all others |
typeProperties | Properties in the typeProperties section depend on type of the activity. To see type properties for an activity, click links to the activity in the previous section. | No |
policy | Policies that affect the run-time behavior of the activity. If it is not specified, default policies are used. | No |
scheduler | “scheduler” property is used to define desired scheduling for the activity. Its subproperties are the same as the ones in the availability property in a dataset. | No |
Policies
Policies affect the run-time behavior of an activity, specifically when the slice of a table is processed. The following table provides the details.
Property | Permitted values | Default Value | Description |
---|---|---|---|
concurrency | Integer Max value: 10 |
1 | Number of concurrent executions of the activity. It determines the number of parallel activity executions that can happen on different slices. For example, if an activity needs to go through a large set of available data, having a larger concurrency value speeds up the data processing. |
executionPriorityOrder | NewestFirst OldestFirst |
OldestFirst | Determines the ordering of data slices that are being processed. For example, if you have 2 slices (one happening at 4pm, and another one at 5pm), and both are pending execution. If you set the executionPriorityOrder to be NewestFirst, the slice at 5 PM is processed first. Similarly if you set the executionPriorityORder to be OldestFIrst, then the slice at 4 PM is processed. |
retry | Integer Max value can be 10 |
0 | Number of retries before the data processing for the slice is marked as Failure. Activity execution for a data slice is retried up to the specified retry count. The retry is done as soon as possible after the failure. |
timeout | TimeSpan | 00:00:00 | Timeout for the activity. Example: 00:10:00 (implies timeout 10 mins) If a value is not specified or is 0, the timeout is infinite. If the data processing time on a slice exceeds the timeout value, it is canceled, and the system attempts to retry the processing. The number of retries depends on the retry property. When timeout occurs, the status is set to TimedOut. |
delay | TimeSpan | 00:00:00 | Specify the delay before data processing of the slice starts. The execution of activity for a data slice is started after the Delay is past the expected execution time. Example: 00:10:00 (implies delay of 10 mins) |
longRetry | Integer Max value: 10 |
1 | The number of long retry attempts before the slice execution is failed. longRetry attempts are spaced by longRetryInterval. So if you need to specify a time between retry attempts, use longRetry. If both Retry and longRetry are specified, each longRetry attempt includes Retry attempts and the max number of attempts is Retry * longRetry. For example, if we have the following settings in the activity policy: Retry: 3 longRetry: 2 longRetryInterval: 01:00:00 Assume there is only one slice to execute (status is Waiting) and the activity execution fails every time. Initially there would be 3 consecutive execution attempts. After each attempt, the slice status would be Retry. After first 3 attempts are over, the slice status would be LongRetry. After an hour (that is, longRetryInteval’s value), there would be another set of 3 consecutive execution attempts. After that, the slice status would be Failed and no more retries would be attempted. Hence overall 6 attempts were made. If any execution succeeds, the slice status would be Ready and no more retries are attempted. longRetry may be used in situations where dependent data arrives at non-deterministic times or the overall environment is flaky under which data processing occurs. In such cases, doing retries one after another may not help and doing so after an interval of time results in the desired output. Word of caution: do not set high values for longRetry or longRetryInterval. Typically, higher values imply other systemic issues. |
longRetryInterval | TimeSpan | 00:00:00 | The delay between long retry attempts |
Sample copy pipeline
In the following sample pipeline, there is one activity of type Copy in the activities section. In this sample, the copy activity copies data from an Azure Blob storage to Azure SQL Database.
{
"name": "CopyPipeline",
"properties": {
"description": "Copy data from a blob to Azure SQL table",
"activities": [
{
"name": "CopyFromBlobToSQL",
"type": "Copy",
"inputs": [
{
"name": "InputDataset"
}
],
"outputs": [
{
"name": "OutputDataset"
}
],
"typeProperties": {
"source": {
"type": "BlobSource"
},
"sink": {
"type": "SqlSink",
"writeBatchSize": 10000,
"writeBatchTimeout": "60:00:00"
}
},
"Policy": {
"concurrency": 1,
"executionPriorityOrder": "NewestFirst",
"retry": 0,
"timeout": "01:00:00"
}
}
],
"start": "2016-07-12T00:00:00Z",
"end": "2016-07-13T00:00:00Z"
}
}
Note the following points:
- In the activities section, there is only one activity whose type is set to Copy.
- Input for the activity is set to InputDataset and output for the activity is set to OutputDataset. See Datasets article for defining datasets in JSON.
- In the typeProperties section, BlobSource is specified as the source type and SqlSink is specified as the sink type. In the Data movement activities section, click the data store that you want to use as a source or a sink to learn more about moving data to/from that data store.
For a complete walkthrough of creating this pipeline, see Tutorial: Copy data from Blob Storage to SQL Database.
Sample transformation pipeline
In the following sample pipeline, there is one activity of type HDInsightHive in the activities section. In this sample, the HDInsight Hive activity transforms data from an Azure Blob storage by running a Hive script file on an Azure HDInsight Hadoop cluster.
{
"name": "TransformPipeline",
"properties": {
"description": "My first Azure Data Factory pipeline",
"activities": [
{
"type": "HDInsightHive",
"typeProperties": {
"scriptPath": "adfgetstarted/script/partitionweblogs.hql",
"scriptLinkedService": "AzureStorageLinkedService",
"defines": {
"inputtable": "wasb://adfgetstarted@<storageaccountname>.blob.core.windows.net/inputdata",
"partitionedtable": "wasb://adfgetstarted@<storageaccountname>.blob.core.windows.net/partitioneddata"
}
},
"inputs": [
{
"name": "AzureBlobInput"
}
],
"outputs": [
{
"name": "AzureBlobOutput"
}
],
"policy": {
"concurrency": 1,
"retry": 3
},
"scheduler": {
"frequency": "Month",
"interval": 1
},
"name": "RunSampleHiveActivity",
"linkedServiceName": "HDInsightOnDemandLinkedService"
}
],
"start": "2016-04-01T00:00:00Z",
"end": "2016-04-02T00:00:00Z",
"isPaused": false
}
}
Note the following points:
- In the activities section, there is only one activity whose type is set to HDInsightHive.
- The Hive script file, partitionweblogs.hql, is stored in the Azure storage account (specified by the scriptLinkedService, called AzureStorageLinkedService), and in script folder in the container adfgetstarted.
- The
defines
section is used to specify the runtime settings that are passed to the hive script as Hive configuration values (e.g${hiveconf:inputtable}
,${hiveconf:partitionedtable}
).
The typeProperties section is different for each transformation activity. To learn about type properties supported for a transformation activity, click the transformation activity in the Data transformation activities table.
For a complete walkthrough of creating this pipeline, see Tutorial: Build your first pipeline to process data using Hadoop cluster.
Multiple activities in a pipeline
The previous two sample pipelines have only one activity in them. You can have more than one activity in a pipeline.
If you have multiple activities in a pipeline and output of an activity is not an input of another activity, the activities may run in parallel if input data slices for the activities are ready.
You can chain two activities by having the output dataset of one activity as the input dataset of the other activity. The second activity executes only when the first one completes successfully.
In this sample, the pipeline has two activities: Activity1 and Activity2. The Activity1 takes Dataset1 as an input and produces an output Dataset2. The Activity takes Dataset2 as an input and produces an output Dataset3. Since the output of Activity1 (Dataset2) is the input of Activity2, the Activity2 runs only after the Activity completes successfully and produces the Dataset2 slice. If the Activity1 fails for some reason and does not produce the Dataset2 slice, the Activity 2 does not run for that slice (for example: 9 AM to 10 AM).
You can also chain activities that are in different pipelines.
In this sample, Pipeline1 has only one activity that takes Dataset1 as an input and produces Dataset2 as an output. The Pipeline2 also has only one activity that takes Dataset2 as an input and Dataset3 as an output.
For more information, see scheduling and execution.
Create and monitor pipelines
You can create pipelines by using one of these tools or SDKs.
- Copy Wizard
- Visual Studio
- Azure PowerShell
- Azure Resource Manager template
- REST API
- .NET API
See the following tutorials for step-by-step instructions for creating pipelines by using one of these tools or SDKs.
Once a pipeline is created/deployed, you can manage and monitor your pipelines by using the Azure portal blades or Monitor and Manage App. See the following topics for step-by-step instructions.
- Monitor and manage pipelines by using Azure portal blades.
- Monitor and manage pipelines by using Monitor and Manage App
Onetime pipeline
You can create and schedule a pipeline to run periodically (for example: hourly or daily) within the start and end times you specify in the pipeline definition. See Scheduling activities for details. You can also create a pipeline that runs only once. To do so, you set the pipelineMode property in the pipeline definition to onetime as shown in the following JSON sample. The default value for this property is scheduled.
{
"name": "CopyPipeline",
"properties": {
"activities": [
{
"type": "Copy",
"typeProperties": {
"source": {
"type": "BlobSource",
"recursive": false
},
"sink": {
"type": "BlobSink",
"writeBatchSize": 0,
"writeBatchTimeout": "00:00:00"
}
},
"inputs": [
{
"name": "InputDataset"
}
],
"outputs": [
{
"name": "OutputDataset"
}
],
"name": "CopyActivity-0"
}
],
"pipelineMode": "OneTime"
}
}
Note the following:
- Start and end times for the pipeline are not specified.
- Availability of input and output datasets is specified (frequency and interval), even though Data Factory does not use the values.
- Diagram view does not show one-time pipelines. This behavior is by design.
- One-time pipelines cannot be updated. You can clone a one-time pipeline, rename it, update properties, and deploy it to create another one.
Next steps
- For more information about datasets, see Create datasets article.
- For more information about how pipelines are scheduled and executed, see Scheduling and execution in Azure Data Factory article.
Feedback
Submit and view feedback for