Skip to main content

Data Subscription

A Carol disponibiliza diversas formas de obter ou consumir dados, como por exemplo por meio de consultas a API REST, Data Subscription e etc.

Dúvidas sobre o que é uma assinatura de dados? Veja mais informações na seção de conceitos.

Configurando Data Subscription

Crie uma subscription para o data model na tenant de cliente acessando as configurações avançadas deste data model e preencha os campos para configurar o envio desta subscription para o aplicativo personalizado do cliente como no exemplo abaixo.

Data Subscription Configuration

Parâmetros:

  • Subscription Name: Nome dado a esta configuração.
  • Connector: seleção do conector a partir de uma lista de conectores disponível na tenant.
  • Start date & time: data e hora para início do serviço.
  • Content-Encoding: seleção da técnica de compactação de dados para o envio de mensagens pelo transporte HTTP.
  • Webhook URL: endereço do endpoint de serviço HTTP (destino) que irá receber os eventos de assinatura da Carol (origem).
  • Max Batch Size: O número máximo de registros que a Carol deve agrupar antes de enviá-los ao tópico. É esperado um valor entre 10 e 10000. O padrão caso esteja em branco é 1000 registros.
  • Max in Flight: Este é o número máximo de mensagens permitidas sem confirmação antes que Carol comece a reter novas mensagens. Carol pode tentar novamente confirmações pendentes de acordo com a estratégia de retirada especificada na documentação. É esperado um valor entre 5 e 100. O padrão é 15 envios paralelos.
  • Max Retries: Este é o número máximo de tentativas que a Carol irá executar o reenvio de uma mensagem que apresentou falha na entrega (nAck) antes de finalmente colocá-la na Dead Letter Queue ou fila de mensagens mortas. O mínimo é 5 tentativas.
  • List of headers: Os cabeçalhos HTTP permitem que o cliente e o servidor passem informações adicionais com uma solicitação ou resposta HTTP. Consistem de um par chave-valor em formato de string de texto não criptografado separados por dois pontos.
  • Status: situação atual da subscrição criada que pode ser Running ou Paused.
Dica

Para simular o Webhook URL do aplicativo personalizado no cliente, utilizamos na imagem acima a ferramenta Pipedream. Caso tenha interesse em simular, siga as instruções logo abaixo em: Configurando o Pipedream.

Configurando o Pipedream
  1. Acesse a funcionalidade Sources no menu lateral esquerdo e clique no botão New + para adicionar um nova fonte.
    pipedream new
  2. Selecione HTTP / Webhook como tipo de fonte.
  3. Selecione a opção New Requests.
  4. Adicione os campos Body Only, Response Status Code, Response Content-Type e Response Body, preenchendo os valores como apresentado na imagem abaixo.
    pipedream config
  5. Após salvar a configuração você será encaminhado para uma página contendo o endereço webhook criado que será utilizado como parâmetro em um dos campos de configuração da subscription.

Alimentando a fila do Data Subscription

Com a assinatura configurada o processo para alimentar a fila ocorre por meio do fluxo abaixo:

No fluxo de entrada ou ingestão na Carol, os dados brutos oriundos de fontes externas (ERP) são enviados através de um conector e recebidos na tenant do cliente sendo armazenados dentro de tabelas de preparo, denominadas staging tables (STG).

Este mesmo processo por sua vez, inicia imediatamente a cópia dos dados brutos para a staging da tenant unificada do carol app instalado no cliente, que por sua vez, possui uma pipeline com agendamento configurado.

A execução desta pipeline dá início ao fluxo de processamento na tenant unificada que realiza o tratamento dos dados que encontram-se na staging gerando registros válidos (golden) que são armazenados dentro dos data models, que por sua vez são retornados aos data models na tenant do cliente pela própria tarefa de processamento SQL.

Por fim, havendo neste data model da tenant cliente uma assinatura de dados configurada, os dados recebidos serão enviados a um serviço externo (webhook) para consumo por aplicativos personalizados do cliente.

O manifesto da pipeline precisa ser configurado com o parâmetro: "sendToSubscriptions": true, para que possa passar uma query compatível para alimentar a fila do Data Subscription.

Endpoint: /api/v3/bigQuery/processQuery

Método HTTP: POST

Payload: query compatível com a camada big query.

Rastreando Mensagens

A Carol disponibiliza o acesso aos dados de Data Subscription de três formas:

Org Admin

O usuário OrgAdmin terá acesso através da home page para visualizar graficamente as mensagens com o ack ou nAck de todos os ambientes abaixo desta organização.

subscription org dashboard

Caso o usuário queira obter mais detalhes sobre as mensagens, poderá acessar clicando no link See more details no rodapé à direita do gráfico acima.

subscription details

Tenant Admin

O usuário Tenant Admin terá acesso através da home page para visualizar graficamente as mensagens com o ack ou nAck.

subscription dashboard

Caso o usuário queira obter mais detalhes sobre as mensagens, poderá acessar clicando no link See more details no rodapé à direita do gráfico acima.

subscription details

BigQuery

Neste fluxo, é possível executar queries numa tabela integrada ao Stackdriver permitindo consultar os logs através de queries.

Execução query

A execução desta query deve ocorrer no Carol Insights Studio, na conexão "carol-bigquery". Abra um chamado caso você tenha alguma dúvida.

Algumas queries de referência estão disponíveis:

  • Consultar mensagens ligadas a um Golden Record (mdmID) subscription
SQL
select timestamp ts, *
from (
select
trim(SUBSTRING(jsonPayload.message, strpos(jsonPayload.message, 'tenant ')+6, 33)) tenantId,
*
from `a_logs.stdout`
) l
inner join (select distinct tenantId, tenantname, orgname from `da1c1280ac9b11e68e250401f8d88501`.dm_caroltenant) t on t.tenantid = l.tenantId
where jsonPayload.message like '%74512f15ede946348daf9bccd587d886%'
order by timestamp desc
limit 100
  • Agregado do tipo de resposta das mensagens Data Subscription
SQL
select l.tenantId, t.tenantname, t.orgname, max(timestamp)
-- , array_agg(struct(responseCode, timestamp))
from (
select
trim(SUBSTRING(jsonPayload.message, strpos(jsonPayload.message, 'tenant ')+6, 33)) tenantId,
(
case
when SUBSTRING(jsonPayload.message, strpos(jsonPayload.message, ' returned ')+10, 3) = "201" then "201"
when strpos(jsonPayload.message, '503 - Service Temporarily Unavailable') > 0 then "503"
when strpos(jsonPayload.message, '401 - Unauthorized. Golden') > 0 then "401"
else SUBSTRING(jsonPayload.message, strpos(jsonPayload.message, 'message id')+29, 15)

END
) responseCode,
*
from `a_logs.stdout`
order by timestamp desc
) l
inner join (select distinct tenantId, tenantname, orgname from `da1c1280ac9b11e68e250401f8d88501`.dm_caroltenant) t on t.tenantid = l.tenantId
where
responseCode in ("401", "503")
and timestamp BETWEEN TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 4 HOUR) AND CURRENT_TIMESTAMP()
group by l.tenantId, t.tenantname, t.orgname
order by 4 desc
  • Consultar tempo mínimo, máximo, médio e mediano da entrega de mensagens
SQL
with data as (
select ts, COALESCE[took, tookk] as took, * except(ts, took, tookk) from (
select timestamp ts, * except (timestamp, jsonpayload)
from (
select
trim(SUBSTRING(jsonPayload.message, strpos(jsonPayload.message, 'tenant ')+6, 33)) tenantId,
trim(SUBSTRING(jsonPayload.message, strpos(jsonPayload.message, ' Response of webhook ')+21, 33)) subscriptionId,
safe_cast((replace(trim(SUBSTRING(jsonPayload.message, strpos(jsonPayload.message, 'Created - ')+10, (strpos(jsonPayload.message, 'ms - '))-strpos(jsonPayload.message, 'Created - '))), 'ms - {"ack','')) as int64) took,
safe_cast((replace(trim(SUBSTRING(jsonPayload.message, strpos(jsonPayload.message, 'returned 200 - OK - ')+20, (strpos(jsonPayload.message, 'ms - '))-strpos(jsonPayload.message, 'returned 200 - OK -'))), 'ms - {"ack":true,"re','')) as int64) tookk,
a.timestamp, a.jsonPayload
from `labs-app-mdm-production.a_logs.stdout` a
) l
left join (select distinct tenantId, tenantname, orgname from `labs-app-mdm-production.da1c1280ac9b11e68e250401f8d88501`.dm_caroltenant) t on t.tenantid = l.tenantId
where timestamp between timestamp_sub(current_timestamp(), interval 1 DAY) and timestamp_sub(current_timestamp(), interval 0 DAY)
)
),
mediam as (
select distinct subscriptionId, percentile_disc(took, 0.5) over (partition by subscriptionId) _median
from data
)

select *
from (
select d.subscriptionId, min(took) _min, max(took) _max, avg(took) _avg, count(*) _total, max(ts) max_ts, min(ts) min_ts
from data d
where took is not null
group by subscriptionId
order by 3 desc
) d
inner join mediam m on d.subscriptionId = m.subscriptionId