Data Subscription
A Carol disponibiliza diversas formas de obter ou consumir dados, como por exemplo por meio de consultas a API REST, Data Subscription e etc.
Dúvidas sobre o que é uma assinatura de dados? → Veja mais informações na seção de conceitos.
Configurando Data Subscription
Crie uma subscription para o data model na tenant de cliente acessando as configurações avançadas deste data model e preencha os campos para configurar o envio desta subscription para o aplicativo personalizado do cliente como no exemplo abaixo.
Parâmetros:
Subscription Name
: Nome dado a esta configuração.Connector
: seleção do conector a partir de uma lista de conectores disponível na tenant.Start date & time
: data e hora para início do serviço.Content-Encoding
: seleção da técnica de compactação de dados para o envio de mensagens pelo transporte HTTP.Webhook URL
: endereço do endpoint de serviço HTTP (destino) que irá receber os eventos de assinatura da Carol (origem).Max Batch Size
: O número máximo de registros que a Carol deve agrupar antes de enviá-los ao tópico. É esperado um valor entre 10 e 10000. O padrão caso esteja em branco é 1000 registros.Max in Flight
: Este é o número máximo de mensagens permitidas sem confirmação antes que Carol comece a reter novas mensagens. Carol pode tentar novamente confirmações pendentes de acordo com a estratégia de retirada especificada na documentação. É esperado um valor entre 5 e 100. O padrão é 15 envios paralelos.Max Retries
: Este é o número máximo de tentativas que a Carol irá executar o reenvio de uma mensagem que apresentou falha na entrega (nAck) antes de finalmente colocá-la na Dead Letter Queue ou fila de mensagens mortas. O mínimo é de 5 tentativas, caso seja informado um valor menor será assumido o valor mínimo. O máximo é de 99 tentativas, caso seja informado um valor maior será assumido o valor máximo.List of headers
: Os cabeçalhos HTTP permitem que o cliente e o servidor passem informações adicionais com uma solicitação ou resposta HTTP. Consistem de um par chave-valor em formato de string de texto não criptografado separados por dois pontos.Status
: situação atual da subscrição criada que pode serRunningouPaused.
Simular Recebimento Data Subscription
Para simular o Webhook URL
do aplicativo personalizado no cliente, utilizamos na imagem acima a ferramenta Pipedream. Caso tenha interesse em simular, siga as instruções logo abaixo em: Configurando o Pipedream.
Configurando o Pipedream
- Acesse a funcionalidade
Sources
no menu lateral esquerdo e clique no botãoNew +
para adicionar um nova fonte. - Selecione
HTTP / Webhook
como tipo de fonte. - Selecione a opção
New Requests
. - Adicione os campos
Body Only
,Response Status Code
,Response Content-Type
eResponse Body
, preenchendo os valores como apresentado na imagem abaixo. - Após salvar a configuração você será encaminhado para uma página contendo o endereço webhook criado que será utilizado como parâmetro em um dos campos de configuração da subscription.
É possível enviar um ACK total, usando esse formato de Json:
{
"ack": true
}
Também é possível efetuar o ACK parcial, enviando os mdmID que foram recebidos e processados. Com isso a Carol vai reorganizar as mensagens para reenvio dos mdmID que não foram confirmados:
{
"ack": true,
"redeliverRecordIds": [
"{golden_record_id_1}",
"{golden_record_id_2}"
]
}
Outra forma de responder ao Data Subscription é enviando o nACK, que indica rejeitar a mensagem:
{
"ack": false,
"message": "Failed constraint xyz"
}
Com isso a Carol vai reenviar o registro posteriormente. Importante adicionar uma mensagem com o motivo da rejeição do registro via Data Subscription.
Alimentando a fila do Data Subscription
Com a assinatura configurada o processo para alimentar a fila ocorre por meio do fluxo abaixo:
No fluxo de entrada ou ingestão na Carol, os dados brutos oriundos de fontes externas (ERP) são enviados através de um conector e recebidos na tenant do cliente sendo armazenados dentro de tabelas de preparo, denominadas staging tables (STG).
Este mesmo processo por sua vez, inicia imediatamente a cópia dos dados brutos para a staging da tenant unificada do carol app instalado no cliente, que por sua vez, possui uma pipeline com agendamento configurado.
A execução desta pipeline dá início ao fluxo de processamento na tenant unificada que realiza o tratamento dos dados que encontram-se na staging gerando registros válidos (golden) que são armazenados dentro dos data models, que por sua vez são retornados aos data models na tenant do cliente pela própria tarefa de processamento SQL.
Por fim, havendo neste data model da tenant cliente uma assinatura de dados configurada, os dados recebidos serão enviados a um serviço externo (webhook) para consumo por aplicativos personalizados do cliente.
- BigQuery
- Real-time
O manifesto da pipeline precisa ser configurado com o parâmetro:
"sendToSubscriptions": true
, para que possa passar uma query compatível para alimentar a fila do Data Subscription.
Endpoint: /api/v3/bigQuery/processQuery
Método HTTP: POST
Payload: query compatível com a camada big query
.
O endpoint abaixo está disponível especificamente para ambientes com a camada Real-time habilitada. Esse endpoint permite passar uma query compatível com a camada Real-time para alimentar a fila do Data Subscription.
Endpoint: /api/v1/subscription/${dataSubscriptionId}/feedQuery
Método HTTP: POST
Payload: query compatível com a camada real-time
.
Recebimento de mensagens
A Carol efetua o envio de mensagens no formato abaixo:
{
"messageId": "2453103412224342",
"publishedAt": "2021-05-28T22:38:56.188538Z",
"records":
[
{
"mdmCounterForEntity": 1622241532763000,
"mdmCreated": "2021-05-28T22:38:52.763Z",
"mdmDeleted": false,
"mdmEntityTemplateId": "eec1dd50a14546a6894e7636ba03f62f",
"mdmEntityType": "apinvoiceaccountingGolden",
"mdmGoldenFieldAndValues":
{
"amount": 1000,
"account_id": "f768115f14863655f390e996d125699b",
"invoice_id": "40b090c2867f26aa6f13918430e5a58c",
"invoiceaccounting_id": "00434889e9627d466ecf4f40da482921",
"_orgid": "1728332a9cddf2f391aa7b48a23255e2",
"costcenter_id": "be1f4298cd0eb0733a7c92bb3a8ae034"
},
"mdmId": "d3f1c0cf246bf087ff001a1d98dc4125",
"mdmLastUpdated": "2021-05-28T22:38:52.763Z",
"mdmMasterCount": 1,
"mdmPreviousIds":
[],
"mdmTenantId": "3895e1c66df14611bc017a69594a3894"
}
],
"subscriptionId": "624bccd237c44ddc883bbc7689c4bd83",
"subscriptionName": "apinvoiceaccounting",
"subscriptionType": "GOLDEN",
"tenantId": "3895e1c66df14611bc017a69594a3894"
}
Observações sobre a mensagem recebida:
- As mensagens podem conter dentro de "records" a quantidade máxima de registros informado em "Batch Size" do subscription.
- Um documento em "records" significa um golden record na plataforma Carol.
- Eventos de eliminação de registro são propagados com o atributo "mdmDeleted" com o valor "true" por padrão. Todas as columas do Golden Record são enviados para facilitar o processo de integração e identificação do registro por parte do endpoint.
Rastreando Mensagens
A Carol disponibiliza o acesso aos dados de Data Subscription de três formas:
Org Admin
O usuário OrgAdmin terá acesso através da home page para visualizar graficamente as mensagens com o ack ou nAck de todos os ambientes abaixo desta organização.
Caso o usuário queira obter mais detalhes sobre as mensagens, poderá acessar clicando no link See more details no rodapé à direita do gráfico acima.
Tenant Admin
O usuário Tenant Admin terá acesso através da home page para visualizar graficamente as mensagens com o ack ou nAck.
Caso o usuário queira obter mais detalhes sobre as mensagens, poderá acessar clicando no link See more details no rodapé à direita do gráfico acima.
BigQuery
Neste fluxo, é possível executar queries numa tabela integrada ao Stackdriver permitindo consultar os logs através de queries.
A execução desta query deve ocorrer no Carol Insights Studio, na conexão "carol-bigquery". Abra um chamado caso você tenha alguma dúvida.
Algumas queries de referência estão disponíveis:
- Consultar mensagens ligadas a um Golden Record (mdmID) subscription
select timestamp ts, *
from (
select
trim(SUBSTRING(jsonPayload.message, strpos(jsonPayload.message, 'tenant ')+6, 33)) tenantId,
*
from `a_logs.stdout`
) l
inner join (select distinct tenantId, tenantname, orgname from `da1c1280ac9b11e68e250401f8d88501`.dm_caroltenant) t on t.tenantid = l.tenantId
where jsonPayload.message like '%74512f15ede946348daf9bccd587d886%'
order by timestamp desc
limit 100
- Agregado do tipo de resposta das mensagens Data Subscription
select l.tenantId, t.tenantname, t.orgname, max(timestamp)
-- , array_agg(struct(responseCode, timestamp))
from (
select
trim(SUBSTRING(jsonPayload.message, strpos(jsonPayload.message, 'tenant ')+6, 33)) tenantId,
(
case
when SUBSTRING(jsonPayload.message, strpos(jsonPayload.message, ' returned ')+10, 3) = "201" then "201"
when strpos(jsonPayload.message, '503 - Service Temporarily Unavailable') > 0 then "503"
when strpos(jsonPayload.message, '401 - Unauthorized. Golden') > 0 then "401"
else SUBSTRING(jsonPayload.message, strpos(jsonPayload.message, 'message id')+29, 15)
END
) responseCode,
*
from `a_logs.stdout`
order by timestamp desc
) l
inner join (select distinct tenantId, tenantname, orgname from `da1c1280ac9b11e68e250401f8d88501`.dm_caroltenant) t on t.tenantid = l.tenantId
where
responseCode in ("401", "503")
and timestamp BETWEEN TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 4 HOUR) AND CURRENT_TIMESTAMP()
group by l.tenantId, t.tenantname, t.orgname
order by 4 desc
- Consultar tempo mínimo, máximo, médio e mediano da entrega de mensagens
with data as (
select ts, COALESCE[took, tookk] as took, * except(ts, took, tookk) from (
select timestamp ts, * except (timestamp, jsonpayload)
from (
select
trim(SUBSTRING(jsonPayload.message, strpos(jsonPayload.message, 'tenant ')+6, 33)) tenantId,
trim(SUBSTRING(jsonPayload.message, strpos(jsonPayload.message, ' Response of webhook ')+21, 33)) subscriptionId,
safe_cast((replace(trim(SUBSTRING(jsonPayload.message, strpos(jsonPayload.message, 'Created - ')+10, (strpos(jsonPayload.message, 'ms - '))-strpos(jsonPayload.message, 'Created - '))), 'ms - {"ack','')) as int64) took,
safe_cast((replace(trim(SUBSTRING(jsonPayload.message, strpos(jsonPayload.message, 'returned 200 - OK - ')+20, (strpos(jsonPayload.message, 'ms - '))-strpos(jsonPayload.message, 'returned 200 - OK -'))), 'ms - {"ack":true,"re','')) as int64) tookk,
a.timestamp, a.jsonPayload
from `labs-app-mdm-production.a_logs.stdout` a
) l
left join (select distinct tenantId, tenantname, orgname from `labs-app-mdm-production.da1c1280ac9b11e68e250401f8d88501`.dm_caroltenant) t on t.tenantid = l.tenantId
where timestamp between timestamp_sub(current_timestamp(), interval 1 DAY) and timestamp_sub(current_timestamp(), interval 0 DAY)
)
),
mediam as (
select distinct subscriptionId, percentile_disc(took, 0.5) over (partition by subscriptionId) _median
from data
)
select *
from (
select d.subscriptionId, min(took) _min, max(took) _max, avg(took) _avg, count(*) _total, max(ts) max_ts, min(ts) min_ts
from data d
where took is not null
group by subscriptionId
order by 3 desc
) d
inner join mediam m on d.subscriptionId = m.subscriptionId
Código fonte projeto exemplo Data Subscription
O repositório abaixo possui um exemplo implementado em NodeJS que abre uma API para receber mensagens do Data Subscription.
https://github.com/totvslabs/caroljs/
Se você tiver alguma dúvida ou sugestão, você pode efetuar a abertura de uma issue para entrar em contato conosco.