hadoop_ Usando o Hadoop Distributed File System (HDFS) O Hadoop File System (HDFS) é o componente do Hadoop responsável pela persistência de dados de forma distribuída, constituindo a principal funcionalidade do Hadoop como solução de Big Data. Como o próprio nome determina, ele segue o princípio de um sistema de arquivos, composto por diretórios e arquivos de diferentes formatos, sem nenhuma regra de relacionamento ou indexação entre si. As operações disponíveis para usuários ou programas clientes são as mesmas de um sistema de arquivos de um sistema operacional: criação/remoção de diretórios, criação/remoção de arquivos, escrita de arquivos (apenas no fim dos arquivos, conhecido como append, suportado a partir da versão 0.20.X) e leitura de arquivo (não há acesso randômico). Apresenta um shell para interação via linha de comando e uma API Java para manipulação via código. Seus comandos e sua estrutura de árvores de diretórios e permissões de acesso foram inspirados no linux. No artigo sobre Hadoop da edição 52 podem ser encontradas informações sobre os principais comandos shell para criação de uma estrutura de pastas e arquivos a serem processados com MapReduce. O grande benefício e objetivo do Hadoop é armazenar os dados de seu sistema de arquivos de forma distribuída. É replicando e particionando seus dados por diversas máquinas em um cluster que o Hadoop se torna capaz de armazenar e gerenciar petabytes de dados, sendo capaz de armazenar, por exemplo, um arquivo de 100 terabytes, algo que sobrepujaria a maioria dos sistemas de arquivos. É desta forma que o HDFS endereça o problema denominado Big data, onde os dados crescem de forma tão intensa e complexa que se torna difícil armazená-los e processá-los através das ferramentas convencionais. Para tornar tal funcionalidade transparente para o cliente (ao interagir com o Hadoop o usuário não sabe em que nós ou segmentos de rede o arquivo se encontra) é necessário separar os requisitos em componentes com diferentes responsabilidades, / 44 como replicação, divisão de arquivos em blocos, endereçamento de cada bloco, atualização do endereço de blocos, monitoramento dos nós, armazenamento dos dados. Todos esses requisitos enumerados no parágrafo anterior são endereçados pelos principais subcomponentes do HDFS. São eles: NameNode, SecondaryNameNode e DataNode. Cada um desses componentes executa como um daemon, em uma própria instância da JVM, sendo o NameNode o processo mestre, que gerencia os processos escravos, chamados DataNodes. Já o SecondaryNameNode é um processo de administração do NameNode que apesar do nome sugestivo, nada tem a ver com backup. NameNode O NameNode é o processo master do HDFS. Todas as operações de manipulação de arquivos ou pastas do HDFS são tratadas pelo NameNode. Isso porque é ele quem contém as informações de em quais nós ao longo do cluster estão cada segmento de arquivo salvo no HDFS, bem como os diretórios existentes e suas permissões de acesso. Como todo dado salvo no HDFS é persistido em arquivo (binário ou texto), arquivos representando grandes DataSets (como, por exemplo, total de mensagens trocadas entre usuários de uma rede social) logo excederiam a capacidade física do HD. Para resolver tal desafio, o NameNode divide o arquivo em blocos de tamanho predefinido e define para quais DataNodes cada um desses segmentos será enviado. Além disso, a configuração do NameNode também define a quantidade de réplicas de cada bloco para mitigar possíveis falhas de nós. O NameNode definirá quais nós receberão cada uma das réplicas, balanceando a carga de uso de cada nó e levando em conta também o segmento de rede em que cada nó se encontra. É importante ressaltar que o NameNode em momento algum manipula os dados persistidos no HDFS. Ele apenas coordena as operações a serem Wellington Ramos Chevreuil | [email protected] Engenheiro de Computação, SCJP, SCJD, SCMAD, SCWCD, SCBCD, SCEA. Atualmente utilizando hadoop e mapreduce em projetos de análise de grande volume de dados e geração de recomendação e demais conteúdos em serviços de entretenimento via web. É também administrador Hadoop certificado pela Cloudera. Fábio Moreira de Almeida | [email protected] Analista de sistemas formado pela UTAM, especialista em Desenvolvimento em Software Livre pela UEA. Mestrando do curso de Recuperação da Informação pela UFAM. Atualmente, pesquisador do Instituo Nokia de Tecnologia (INdT), trabalhando com o hadoop em sistemas de recomendação e inteligência contextual. É também administrador Hadoop certificado pela Cloudera. Flavio Eduardo de Lima | [email protected] Analista de Sistemas, CSM, CSD, SCJP, SCWCD, SCMAD, SCJA. Experiência com hadoop e seu ecossistema em projetos de análise de grande volume de dados. É também administrador Hadoop certificado pela Cloudera. Continuando a sequência de artigos sobre o Hadoop, será detalhado um de seus principais componentes, o Hadoop File System, muito comumente referenciado pela sigla HDFS. O presente artigo demonstrará o funcionamento geral do sistema de persistência de dados do Hadoop, explicando conceitos básicos de seu funcionamento, bem como a API Java para interação e manipulação do HDFS. Hadoop não é Banco de Dados Um malentendimento bastante comum notado em artigos de blogs, fóruns e mesmo revistas técnicas, é considerar o Hadoop um banco de dados NoSQL. Tal interpretação é equivocada, e como pode ser visto ao longo deste artigo, o HDFS não implementa conceitos de banco de dados. realizadas, provendo informações e instruções para que os programas clientes possam realizar essas operações diretamente com os nós onde os dados estão sendo gravados (ou de onde estão sendo lidos). O endereçamento dos dados que se encontram em cada nó ao longo do cluster é mantido em um arquivo do NameNode chamado fsimage. Por motivos de desempenho, quando o HDFS é iniciado, as informações do fsimage são carregadas para a memória do NameNode, evitando a necessidade de se realizar leitura de arquivo em disco sempre que tiver que tratar requisições de clientes. Conforme o estado do HDFS vai sendo alterado (novos arquivos vão sendo inseridos, outros removidos, pastas são alteradas, arquivos têm dados acrescentados, nós inteiros são removidos ou adicionados ao cluster), as informações sobre que nó contém quais blocos também vai sendo atualizada na memória do NameNode, deixando o fsimage desatualizado. Para que essa diferença entre a memória do NameNode e o fsimage não seja perdido para sempre no caso de um desligamento, o estado do NameNode é sempre mantido também em arquivos tempo- rários, chamados edits (simplesmente escrever cada nova operação concorrente em um novo arquivo é menos custoso do que tentar sincronizar toda vez o mesmo arquivo). Com o passar do tempo, o número de arquivos edit-log pode se tornar grande demais, tornando-se necessária uma atualização do fs-image. Essa é a função do SecondaryNameNode. Listagem 1. Consumo de memória armazenando-se a mesma quantidade de dados em um único arquivo em comparação com muitos. Um arquivo com 1GB de dados Mil arquivos de 1MB de dados Nome do arquivo = 1 descritor Mil nomes de arquivos = 1.000 descritores HDFS configurado para blocos de HDFS configurado para blono máximo 128MB, e para man- cos de no máximo 128MB, ter sempre 3 réplicas: e para manter sempre 3 ré8 blocos x 3 réplicas = 24 descri- plicas: tores de bloco 1.000 blocos x 3 réplicas = 3.000 descritores de bloco Total de descritores = 25 Total de descritores: 4.000 Consumo aproximado de 5 mil Consumo aproximado de bytes de RAM 800 mil bytes de RAM 45 \ Rack Awareness O Hadoop permite que se informe detalhes sobre a topologia da rede em que o cluster está configurado. Isso é muito útil para se obter melhor desempenho de execução de MapReduces (distribuindo tarefas entre nós dispostos no mesmo rack ou segmento de rede), mas também evitando indisponibilidade em caso de falha de um rack inteiro, mantendo sempre pelo menos uma réplica em um segmento diferente dos demais. SecondaryNameNode O SecondaryNameNode é o processo responsável por sincronizar os arquivos edits com a imagem do fsimage, para gerar um novo fsimage mais atualizado. Essa atividade é executada pelo SecondaryNameNode de forma agendada, definindo-se um intervalo de tempo em segundos no arquivo core-site.xml, conforme exibido na Listagem 2. Também pode ser disparada sempre que o total de edits atingir um limite de tamanho em bytes, predeterminado no arquivo de configuração, conforme Listagem 3. Poucos arquivos grandes consomem menos memória A forma de gerenciar arquivos pelo NameNode faz com que ao se planejar um cluster deva-se priorizar o uso de poucos arquivos grandes (vários GB a PB), ao invés de muitos arquivos pequenos. Isso porque cada arquivo e bloco de arquivo é representado no NameNode por descritores que ocupam, em média, 150-200 bytes de RAM. Um maior número de arquivos acaba resultando em um maior número de descritores na memória do NameNode. O exemplo da Listagem 1 descreve um cenário comparativo do consumo de memória com relação ao de arquivos no HDFS. combiná-los em um novo fsimage acarreta em um consumo elevado de RAM. Por este motivo, ele foi projetado para executar como um daemon separado do NameNode, evitando que essas atividades pudessem comprometer o desempenho do NameNode. Em clusters grandes, é altamente recomendado manter o SecondaryNameNode em uma máquina exclusiva. Como já foi citado, o nome SecondaryNameNode pode induzir ao erro de se concluir que o mesmo seja um NameNode de backup, ou mesmo uma redundância do serviço do NameNode, para o caso de uma eventual indisponibilidade. Tal avaliação está errada, não há no Hadoop serviço de redundância ou backup do NameNode. Tais estratégias devem ser planejadas e implementadas sob demanda, pelo administrador/ mantenedor do cluster. Para evitar essa confusão de conceitos, a versão 0.21 do Hadoop passou a denominá-lo como CheckpointNode. DataNode O DataNode é o processo responsável por armazenar blocos de arquivos gigantes que serão utilizados pela aplicação do usuário. Apesar de o NameNode desempenhar a maioria das funções administrativas do HDFS, os DataNodes detêm certa autonomia na manutenção dos dados e interação com clientes em processos de escrita e leitura. Por exemplo, cada DataNode se comunica com outros DataNodes para faListagem 2. Trigger de limite de tempo (em segunzer a replicação dos seus blocos (por default um bloco dos). é copiado entre 3 DataNodes). Essa é uma das razões pelas quais não se usa RAID para fazer cópias de se<property> guranças dos blocos. <name>fs.checkpoint.size</name> Em uma operação de leitura, o DataNode também <value>3600</value> é responsável por recuperar e transferir os blocos. <description>O número de segundos entre duas Uma vez que o NameNode identificou quais nós possincronias periódicas.</description> </property> suem blocos do arquivo solicitado, essa informação é mandada de volta para o cliente, e a comunicação passa ser direta com os DataNodes responsáveis por Listagem 3. Trigger de limite de tamanho de arquivo cada bloco. A figura 1 ilustra os passos de uma leitura (em bytes). de arquivo no HDFS por um cliente. Já em operações de escrita, a responsabilidade do <property> DataNode é ainda maior. A partir do momento que o <name>fs.checkpoint.size</name> NameNode define em quantos blocos o arquivo será <value>67108864</value> quebrado e quais os DataNodes que armazenarão <description>O tamanho do editlog (em bytes) que esses blocos, essas informações são enviadas para o funciona como um gatilho para uma sincronia periódica. cliente, que em seguida conecta ao primeiro DataÉ acionada mesmo que o fs.checkpoint.period não tenha Node da lista e passa a enviar os dados do arquivo expirado.</description> </property> para escrita. Na medida em que esse primeiro DataNode começa a receber os dados, ele próprio conecta O fato de o SecondaryNameNode ter que carre- ao segundo DataNode e repassa os dados. O segundo gar todo o fsimage em memória, mais os edits, para / 46 O NameNode envia instruções para o DataNode usando respostas nos próprios heartbeats. As instruções incluem comandos como replicar blocos, remover réplicas de blocos, solicitar relatórios de blocos ou atualizar o DataNode. Blocos No HDFS os arquivos são salvos em blocos entre os DataNodes. Na figura 3 é exibido um diagrama ilustrando como os arquivos Figura 1. Fases de uma operação de leitura de arquivo do HDFS por uma aplicação cliente. são armazenados no cluster. Cada arquivo é representaDataNode irá conectar ao próximo DataNode da lista do por dois blocos e replicado entre os DataNodes para enviar os dados, formando uma corrente de Dausando o valor padrão de replicação que é de 3 DataNodes. Assim que cada DataNode vai terminando taNodes. de realizar suas escritas, eles transmitem mensagens O tamanho padrão de um bloco no Hadoop é de de finalização da operação de volta na corrente até 67108864 (64Mb), mas para cenários de uso de arque essas cheguem ao cliente, o qual irá reportar ao quivos muito grandes, pode ser útil aumentar esse NameNode quando o bloco foi finalmente escrito. A tamanho para 134217728 (128Mb), com o objetivo figura 2 ilustra em detalhes todo esse processo. de reduzir os custos em buscas. Diferentemente dos Além de controlar as funções de leitura e escrita, outros sistemas de arquivos, um arquivo criado no cada DataNode tem a responsabilidade de atualizar Hadoop com tamanho inferior a um bloco não vai o NameNode com informações gerais de seu funcioocupar a capacidade total especificada para o bloco. namento. Para isso, o processo do DataNode envia Um bloco é representado por dois arquivos que mensagens ao NameNode para informar se está disestão armazenados no sistema de arquivos local onde ponível e quais blocos de arquivos armazenados por o processo do DataNode está rodando. O primeiro ele podem ser acessados ou não (possivelmente corcontém o dado propriamente dito e o segundo conrompidos). Essas mensagens também levam outras tém as informações sobre o bloco. estatísticas, como, por exemplo, espaço disponível no disco. Todos esses dados auxiliam o NameNode no gerenciamento geral do cluster. Essas mensagens en- API Java para manipulação de arquivos viada pelos DataNodes são chamadas de heartbeats. no HDFS As classes nativas de manipulação de arquivos do O intervalo default de heartbeats é 3 segundos. O Namenode vai considerar o DataNode indisponí- Java não funcionam com o HDFS. Para realizar opevel quando não receber heartbeats no período de 10 rações de manipulação de arquivos e diretórios no minutos. Figura 2. Fases de uma operação de escrita de arquivo do HDFS por uma aplicação cliente. 47 \ /para saber mais No artigo “Processando Dados com Hadoop” da edição 52 da MundoJ, foi explicada a origem do HDFS, os motivos que levaram à sua especificação e implementação, quais os cenários que inspiraram o seu desenvolvimento e quais requisitos ele se propunha resolver. O presente artigo visa cobrir o HDFS em mais “baixo nível”, revelando alguns detalhes de sua implementação e trazendo exemplos de código de como interagir e manipulá-lo programaticamente. A API Hadoop de manipulação de arquivos é genérica e pode ser usada para se trabalhar com outros sistemas de arquivos que não o HDFS, como, por exemplo, o sistema de arquivos local. Figura 3. Diagrama de armazenamento de arquivos em blocos. usar as classes de stream de dados da biblioteca padrão do Java. FSDataInputStream é uma subclasse da biblioteca padrão Java java.io.DataInputStream com suporte adicional para acesso randômico. Para escrever em um arquivo HDFS, existe uma classe análoga chamada FSDataOutputStream. A Listagem 5 exemplifica a leitura e escrita de arquivos no HDFS. O código realiza merge de dois ou mais arquivos no HDFS. HDFS via programação é preciso fazer uso das bibliotecas que o Hadoop provê no pacote org.apache.hadoop.fs. A figura 4 apresenta um diagrama de classes simplificado da API, com as classes mais comumente usadas em interações com o HDFS. O ponto de partida da API de manipulação de arquivos do Hadoop é a classe abstrata FileSystem, Listagem 5. Exemplo de merge de arquivos no HDFS. responsável por toda interação com o sistema de arquivos. Para conseguir uma instância desta classe é public void merge() throws IOException { preciso chamar o método FileSystem.get, como desConfiguration conf = new Configuration(); FileSystem hdfs = FileSystem.get(conf); monstrado na Listagem 4, considerando que o arquivo core-site.xml contendo as informações do host onde Path origem = new Path(PATH_1); o processo NameNode executa está no classpath: Listagem 4. Código para manipulação de arquivos usando API HDFS. Configuration conf = new Configuration(); FileSystem hdfs = FileSystem.get(conf); Path destino = new Path(PATH_2); try { FileStatus[] inputFiles = hdfs.listStatus(origem); FSDataOutputStream out = hdfs.create(destino); for (int i = 0; i < inputFiles.length; i++) { System.out.println(inputFiles[i].getPath(). A classe Configuration é uma classe especial que getName()); armazena parâmetros de configuração de um Job FSDataInputStream in = hadoop, nela podemos especificar configurações do hdfs.open(inputFiles[i].getPath()); byte buffer[] = new byte[256]; cluster, por exemplo, propriedades como fs.default. int bytesRead = 0; name (o nome do sistema de arquivos padrão) que em while ((bytesRead = in.read(buffer)) > 0) { um cluser hadoop distribuído usualmente é o nome e out.write(buffer, 0, bytesRead); a porta para o namenode. Para hadoops configurados } como single-mode ou modo pseudodistribuído estas in.close(); configurações podem ser suprimidas. } A classe Path é a representação de arquivos e diout.close(); retórios e a classe FileStatus contém metadados so} catch (IOException e) { e.printStackTrace(); bre a mesma, como permissões, tamanho do arquivo, } hora da última modificação etc. O método listStatus da classe FIleSystem retorna } uma lista de objetos FileStatus com metadados dos Suporte à função Append arquivos e o número de elementos retornados repreVersões mais antigas do Hadoop não suportavam senta o número de arquivos de um diretório específia operação de append, portanto uma vez fechado o co. Para manipular o conteúdo dos arquivos podemos arquivo se mantinha imutável e somente podia ser / 48 estável e funciona apropriadamente para as demais situações. O método FSDataOutputStream.hsync() permite que os dados escritos sejam imediatamente descarregados para os DataNodes, permitindo que novos leitores possam lê-lo (sem a necessidade de fechar o arquivo). Muitas aplicações que rodam sobre o HDFS, como o hbase, flume etc.), utilizam os métodos de sincronia da classe FSDataOutputStream. Considerações finais Neste artigo, foram abordados os princípios do funcionamento do HDFS. Mostrou-se como o Hadoop torna possível o armazenamento de Figura 4. Diagrama de classes API Java para interação com o HDFS. Petabytes de dados de forma escalámodificado criando-se uma cópia com um nome difevel e tolerante a falhas, com uso otirente. Um arquivo não existia no HDFS até que tives- mizado de recursos de rede e de disco. O dilema de se sido fechado (método close da classe FSDataOu- armazenamento entre arquivos pequenos e grandes tputStream) com sucesso. Caso ocorresse um erro de foi apresentado, possibilitando que desenvolvedores escrita antes do fechamento, era como se o arquivo e administradores possam fazer melhor uso e manter nunca tivesse existido. um bom desempenho do sistema HDFS . Os primeiros passos em direção ao append foram Além dos detalhes arquiteturais do HDFS, foi exdados na versão 0.15.0, na qual era possível “enxer- plicada ainda a sua API Java, útil para implementação gar” arquivos abertos. Nesta versão o conteúdo de de componentes que interajam com o sistema de arum arquivo podia ser lido ao mesmo tempo em que quivos. Essa biblioteca permite controle completo do era escrito, embora somente o último bloco comple- sistema de arquivos por meio de código Java, confortamente escrito fosse visível (como vimos o hadoop me demonstrado nos exemplos do artigo. escreve arquivos em blocos). Por esta razão é possíTais informações são de grande importância no vel acompanhar a escrita em arquivo bloco-a-bloco planejamento de um cluster Hadoop como solução de através de ferramentas como o fs -tail, por exemplo. BigData, sendo suficientes para uma instalação básiO support ao append só foi inserido na versão 0.19.0, ca em um cenário de uso simples. Para casos de uso o que demandou substanciais mudanças uma vez que mais complexos, que envolvam integração do HDFS toda estrutura do hadoop havia sido desenvolvida ba- com bases de dados relacionais, importação de logs, seada em arquivos imutáveis. ou estruturação de dados de forma não relacional Após a funcionalidade de append ter sido dispo- com acesso randômico, existem ferramentas adicionibilizada, foram notados, em outubro de 2008, que nais do ecossistema Hadoop para serem usadas no uma das garantias básicas de uma funcionalidade de topo da configuração básica do HDFS. append, de que leitores podem ler dados que foram /referências escritos via método FSDataOutputStream’s sync() por Wiki do Hadoop 0.20.2 − http://cloud.ozyegin.edu.tr/ um escritor, não estava funcionando. Por conta deste Hadoop-UML-Diagrams/Documentation/html/index.html problema a funcionalidade de append foi desabilitada na versão 0.19.1, e na versão 0.20.0 o parâmetro de Hadoop API Docs − http://hadoop.apache.org/common/ configuração dfs.support.append, cujo o valor é falso docs/current/api/overview-summary.html por padrão, foi adicionado justamente para que fosse fácil habilitar e desabilitar a instável funcionalidade Livro Hadoop In Action, de Chuck Lam do append. Nas versões 0.20.x (versão legada estável) Livro Hadoop, The Definitive Guide, de Tom White e versão 1.0.X (versão corrente estável) a funcionalidade de append continua instável e é aconselhado File Appends in HDFS que somente seja habilitada em ambientes de dehttp://www.cloudera.com/blog/2009/07/file-appends-insenvolvimento ou teste. É importante salientar que hdfs/ embora a funcionalidade de append esteja instável na versão corrente do hadoop, a leitura concorrente está 49 \