Skip to main content

Sequenciamento de Pipelines

A plataforma Carol permite a criação de um encadeamento de pipelines, ou seja, permite o disparo na execução de um conjunto pipelines seguindo uma sequência linear ou não-linear, aproveitando dos resultados de pipelines anteriores como entrada em uma próxima pipeline.

Benefício desta funcionalidade: Otimização de custos com slots de processamento e maior produtividade no desenvolvimento.

Orquestração de Pipelines

Configuração

A configuração é realizada no arquivo de manifesto por meio do parâmetro nextPipelines que é um array de valores. O parâmetro de agendamento cronExpressions é ignorado para as pipelines encadeadas, ou seja, somente a pipeline raiz que inicia o encadeamento tem seu agendamento respeitado.

Exemplo de Manifesto
{
"defaults": {
"prepareScripts": [
"prepare.sql"
]
},
"pipelines": [
{
"pipelineName": "product",
"pipelineDescription": "Pipeline for the product table, that uses staging nlp_product",
"outputDataModelName": "mdmproduct",
"overlapDeltaMinutes": 30,
"useBatchNotification": false,
"checkExistsDataToProcess": false,
"sendTo": {
"bigquery": {
"customer": true,
"unified": true
},
"subscriptions": {
"customer": false,
"unified": false
},
"realtime": false
},
"clear": {
"bigquery": {
"unified": false,
"customer": false
},
"subscriptions": {
"unified": false,
"customer": false
},
"realtime": false
},
"nextPipelines": ["purchaseproducts"],
"cronExpressions": [
"0 0/1 * ? * * *"
],
"prepareScripts": [
"product_prepare.sql"
],
"processScript": "product.sql",
"sourceEntities": {
"dataModels": [
{
"dataModelName": "mdmproduct"
}
],
"stagings": [
{
"connectorName": "nlp",
"stagingName": "product"
}
]
}
},
{
"pipelineName": "purchaseproducts",
"pipelineDescription": "Pipeline to list purchase order table, that uses staging nlp_product",
"outputDataModelName": "mdmpurchaseorder",
"overlapDeltaMinutes": 30,
"useBatchNotification": false,
"sendTo": {
"bigquery": {
"customer": true,
"unified": true
},
"subscriptions": {
"customer": false,
"unified": false
},
"realtime": false
},
"clear": {
"bigquery": {
"unified": false,
"customer": false
},
"subscriptions": {
"unified": false,
"customer": false
},
"realtime": false
},
"nextPipelines": ["farhorizon"],
"processScript": "purchaseorder2.csql",
"sourceEntities": {
"dataModels": [
{
"dataModelName": "mdmpurchaseproducts"
}
],
"stagings": [
{
"connectorName": "nlp",
"stagingName": "purchaseorder"
}
]
}
},
{
"pipelineName": "farhorizon",
"pipelineDescription": "Far Horizon Data Model, that uses staging nlp_product",
"outputDataModelName": "mdmfarhorizon",
"overlapDeltaMinutes": 30,
"useBatchNotification": false,
"sendTo": {
"bigquery": {
"customer": true,
"unified": true
},
"subscriptions": {
"customer": false,
"unified": false
},
"realtime": false
},
"clear": {
"bigquery": {
"unified": false,
"customer": false
},
"subscriptions": {
"unified": false,
"customer": false
},
"realtime": false
},
"processScript": "purchprodnewsql2.csql",
"sourceEntities": {
"dataModels": [
{
"dataModelName": "mdmfarhorizon"
}
],
"stagings": [
{
"connectorName": "nlp",
"stagingName": "product"
},
{
"connectorName": "nlp",
"stagingName": "purchaseorder"
}
]
}
}
]
}

Exemplo de pipelines Pipelines Sequencing Exemplo de tasks Task Sequencing