Loading ...

O Blog de Tecnologia da Geofusion

Introdução à Data Microservices

Introdução à Data Microservices

Data Microservices são microserviços com foco em processamento de dados, que comunicam-se via mensagens assíncronas. O encadeamento desses microserviços constitui um pipeline de dados.

Um pipeline de dados inicia-se com os microserviços de origens de dados, estes serviços são a entrada de dados do pipeline, e provêem uma maneira de ler os dados a partir de alguma fonte. Após isso aplica-se os microserviços de processamento, quantos forem necessários de maneira encadeada. Levando os dados até os destinos finais, que são os microserviços que podem por exemplo, persistir os dados em alguma base de dados, ou até mesmo simplesmente servir log. A ideia é similar aos processamentos ETL (Extract, Transform and Load), com a diferença que, com as técnicas de microserviços aplicadas, o pipeline de dados pode escalar horizontalmente na nuvem, cada parte sob demanda; ser tolerante a falhas; e evoluir suas partes independentemente.

Essas características permitem que o processamento de dados possa ocorrer sem downtime total mesmo que você precise fazer um evolução de uma parte do pipeline, o que torna o conceito muito útil em cenários de Entrega Contínua.

Pipeline de Dados
Pipeline de Dados

O Spring Cloud Stream é um framework que permite a implementação de data microservices de maneira muito simples. A grosso modo com apenas duas anotações conseguimos transformar uma aplicação Spring Boot em um Data Microservice. O que é melhor, só precisamos definir a entrada e a saída de dados, enquanto a camada de mensageria é transparente à configuração. Com isso podemos trocar essa implementação para qual desejarmos, seja por exemplo, RabbitMQ, Kafka, Redis, Apache Geode.

As duas anotações que precisamos @EnableBinding, e nela dizemos se o serviço é um Processor (uma entrada e uma saída de dados), um Sink (uma entrada de dados), ou um Source (uma saída de dados). E uma segunda anotação — que varia com tipo de microserviço — em que você indica os canais de input e/ou output.

Com Spring Cloud Stream temos uma semântica de publicação e subscrição persistente, grupos de consumo de mensagem e particionamento de dados. Utilizando as principais features dele:

  • Mensageria persistente
  • Destinos nomeados (possibilita conectar varias fontes de dados a um mesmo destino)
  • Grupos de consumo (possibilita particionar o envio de mensagens a grupos diferentes)

O framework também tem suporte ao Project Reactor, que permite programação do fluxo de dados com uma abordagem reativa.

Show me the code!

Para explorar um pouco mais do Spring Cloud Stream, vamos desenvolver um pipeline de dados utilizando conjuntos de dados do Portal Brasileiro de Dados Abertos disponível em http://dados.gov.br/. Lá podemos encontrar um série de dados e informações públicas, você pode saber mais em: http://dados.gov.br/paginas/sobre.

Vamos seguir a seguinte proposta de aplicação:

Proposta de Aplicação
Proposta de Aplicação

A aplicação irá ler dados de duas fontes de dados diferentes, ambas são séries temporais. A primeira, do Sisu (Sistema de Seleção Unificada) apresenta informações relativas à quantidade de inscritos por municípios e ano. A segunda, do Fies (Fundo de Financiamento Estudantil) apresenta informações relativas ao número de contratos firmados por municípios e ano. Após a leitura dos dados eles serão enviados via mensagem um a um, utilizando um serviço Stateful, devido a maneira que os dados são disponibilizados no dataset.

Após isso, o dado será entregue a um logger, e a um Agregador, que irá receber os dados das duas fontes de dados e agregá-las em um único objeto, utilizando Spring Integration. E sua saída também irá para um logger.

Depois de agregados os dados irão para um outro processador que irá utilizar o código dos municípios dado pelo ibge, para ir realizar um requisição em uma URL que retorna um objeto com o nome e o estado do código solicitado.

Toda a comunicação acontece via um broker Kafka de forma transparente no código.

O código completo do pipeline pode ser encontrado nesse repositório:

https://github.com/arturgaleno/datamicroservices-example

Sources

Ambos os serviços utilizados retornam um único json com os dados, desses vamos aproveitar os objetos que possuem a seguinte estrutura:

public class ApiValue {
  @SerializedName(“valor”)
  private Long value;
  @SerializedName(“municipio_ibge”)
  private Long municipalityId;
  @SerializedName(“ano”)
  private Integer year;
}

E então vamos utilizar esses objetos para montar nosso primeiro Source. A anotação @EnableBinding(Source.class) informa ao Spring que essa classe será uma fonte de dados do nosso pipeline e anotação @InboundChannelAdapter serve para configurarmos o método que lançará nossos dados para o serviço de mensageria de saída.

@EnableBinding(Source.class)
public class FiesSource {
 @Autowired
 private MecTimeSeriesClient mecTimeSeriesClient;
 private StatefulTimeSeriesService statefulTimeSeriesService;
 @Bean
 @InboundChannelAdapter(value = Source.OUTPUT, poller = @Poller(fixedDelay = “1”))
 public MessageSource<ApiValue> fiesMessageSource() {
   if (statefulTimeSeriesService == null) {
     statefulTimeSeriesService = new StatefulTimeSeriesService(
      mecTimeSeriesClient.getFiesTimeSeries().getValues()
     );
   }
   return () -> {
     ApiValue next = statefulTimeSeriesService.getNext();
     next.setSourceType(SourceType.FIES);
     return new GenericMessage<>(next);
   };
 }
}

Para não ficar chato, colocarei apenas o source no Fies para servir como exemplo. O resto do código está disponível no repositório do GitHub.

Processors

Iremos ler duas fontes de dados diferentes, e queremos agregar os dados das duas em apenas uma representação. Para isso usaremos a api do Spring Integration juntamente com a do Spring Cloud Stream. De maneira assíncrona, a medida que as mensagens vão chegando, vamos agregá-las utilizando o código de município do IBGE como informação de correlação.

A classe que descreve nosso valor agregado terá a seguinte estrutura:

public class AggregatedValue {
  private Long sisuValue;
  private Long fiesValue;
  private Long municipalityId;
  private Integer year;
}

E nosso agregador será em simples, graças ao Spring Integration. Novamente a anotação @EnableBinding, só que dessa vez, ela indica que nossa classe é um Processor. E as anotações @Aggregator, @CorrelationStrategy e @ReleaseStrategy são do Spring Integration. A primeira é relacionada ao método que executa a agregação, a segunda representa a regra de correlação, e terceira a regra e quando o conjunto de dados correlacionado poderá ser lançado para o método e agregação.

@EnableBinding(Processor.class)
public class ApiAggregator {
  private static final Gson GSON = new Gson();
  @Aggregator(inputChannel = Processor.INPUT, outputChannel = Processor.OUTPUT)
  public AggregatedValue aggregatingMethod(List<String> items) {
    AggregatedValue aggregatedValue = new AggregatedValue();
    items.forEach(s -> {
      ApiValue apiValue = GSON.fromJson(s, ApiValue.class);
      aggregatedValue.setYear(apiValue.getYear());
      aggregatedValue.setMunicipalityId(
        apiValue.getMunicipalityId());
      handleSourceType(apiValue, aggregatedValue);
    });
    return aggregatedValue;
  }
  private void handleSourceType(ApiValue item, AggregatedValue aggregatedValue) {
    switch (item.getSourceType()) {
      case FIES:
        aggregatedValue.setFiesValue(item.getValue());
        break;
      case SISU:
        aggregatedValue.setSisuValue(item.getValue());
        break;
    }
  }
  @CorrelationStrategy
  public String correlateBy(String item) {
    ApiValue apiValue = GSON.fromJson(item, ApiValue.class);
    return apiValue.getMunicipalityId().toString()
      + apiValue.getYear().toString();
  }
  @ReleaseStrategy
  public boolean canMessagesBeReleased(List<String> msgs) {
    return msgs.size() == 2;
  }
}

Como disposto na proposta do sistema, depois desse primeiro Processor, temos um segundo que transforma dos códigos de municípios do IBGE em um objeto mais completo, com o nome do mesmo. Para isso vamos utilizar uma URL que dado um código do IBGE retorna um Json com o código, nome e o estado do mesmo. Após isso teremos nossa informação final com os dados do Fies e Sisu agregados, e com um enriquecimento simples.

O json que descreve as informações do município tem a seguinte estrutura:

public class Municipality {
  @SerializedName(“c”)
  private Long municipalityId;
  @SerializedName(“n”)
  private String name;
  @SerializedName(“s”)
  private String state;
}

E o json das informações enriquecidas, portanto:

public class EnrichedInfo {
  private Long sisuValue;
  private Long fiesValue;
  private Municipality municipality;
  private Integer year;
}

O método desse Processor tem a seguinte estrutura abaixo, é interessante notar que nesse caso, o método @Transformer é diferente do utilizando anteriormente. Mas o objetivo é o mesmo, serve como canal de entrada e saída no nosso processamento.

@Transformer(inputChannel = Processor.INPUT, outputChannel = Processor.OUTPUT)
public EnrichedInfo transform(String msg) {
  …
}

Sinks

Neste projeto teremos um Sink simples, que servirá apenas para realizar logs, e possui a seguinte estrutura:

@EnableBinding(Sink.class)
public class LogSink {
  private Logger LOGGER = LoggerFactory.getLogger(LogSink.class);
  @StreamListener(value = Sink.INPUT)
  public synchronized void logMessage(Object msg) {
    LOGGER.info(“Received: “ + msg);
  }
}

Juntando todas as partes

Nossos microserviços não possuem programaticamente conhecimento nenhum um dos outros, nem dos canais de entrada e saída de cada um. A única coisa que eles conhecem, e de maneira transparente, é sua responsabilidade, e que deverão jogar os dados para algum serviço de mensageria.

Para encadea-los oferecemos essas informações no momento da execução dos serviços. Todos os comandos para executar o projeto descrito está no repositório do GitHub.

Para exemplificar o comando para executar o Source é o seguinte:

$ java -jar sources/target/sources-1.0-SNAPSHOT.jar \
 — spring.cloud.stream.bindings.output.destination=mecsource-input \
 — server.port=8091 \
 — spring.cloud.stream.bindings.output.content-type=application/json;charset=UTF-8

Note que o parâmetro spring.cloud.stream.bindings.output.destination nomeia o nome do canal de saída do Source. Para executar o Processor (comando abaixo), o parâmetro spring.cloud.stream.bindings.input.destination é utilizado com o mesmo nome da destinação do Source. E isso provê as conexões entre os canais de comunicação.

$ java -jar api-aggregator/target/api-aggregator-1.0-SNAPSHOT.jar \
 — spring.cloud.stream.bindings.input.destination=mecsource-input \
 — spring.cloud.stream.bindings.output.destination=apiagregator-output \
 — server.port=8092 \
 — spring.cloud.stream.bindings.output.content-type=application/json;charset=UTF-8 \
 — spring.cloud.stream.bindings.input.content-type=application/json;charset=UTF-8

Concluindo

Vemos que o Spring Cloud Stream é uma tecnologia promissora para processamento de dados na nuvem e facilita a implementação de microserviços assíncronos e desacoplados. As fontes de dados utilizadas, infelizmente, não são tão amigáveis para serem utilizadas, o serviço de busca das informações do código de município do IBGE, retorna um Json com o Content-Type da resposta como text/html. E não foi fácil de achar, esbarrei com ele no console do meu navegar na aba em que observamos os requests que acontecem.

Além disso, muitas das fontes de dados oferecidas pelo portal de dados abertos são igualmente difíceis de consumir por falta de paginação, e inconsistentes com as boas práticas de web API’s.

Mas ainda assim, há variados tipos de informações disponíveis no Portal de Dados Abertos, e apesar de não serem oferecidos da melhor maneira, eles estão lá prontos para serem processados e utilizados.

 

Geofusion.tech

2016 todos os direitos reservados.

www.geofusion.com.br