Elasticsearch | river-jdbc incrémental

By | 18 mars 2014

elastic-searchJe travaille depuis une semaine sur la mise en place d’Elasticsearch en tant que data warehouse, et son exploitation depuis Kibana.

J’ai donc besoin d’indexer un certain nombre de documents provenant de source différentes. Comme beaucoup, 90% de mes données sont actuellement stockées dans des tables MySQL. Certaines d’entre elles contiennent pas loin de 100 millions d’enregistrements, et sont très souvent actualisées.

Je vais donc utiliser le système de River d’Elasticsearch, qui permet d’alimenter au fil de l’eau les index depuis n’importe quelle source (DropBox, Twitter, CouchDB …). Il en existe évidement un pour les bases de données SQL, jdbc-river.

Je me lance donc dans l’installation du plugin et crée ma première Rivière.

curl -XPUT 'xx.xx.xx.xx:9200/_river/ma_riviere/_meta' -d '{
    "type" : "jdbc",
    "jdbc" : {
        "url" : "jdbc:mysql://xx.xx.xx.xx:3306/mdn_mp",
        "user" : "elasticsearch",
        "password" : "je_suis_pas_fou",
        "bulk_size" : 2000,
        "max_bulk_requests" : 10,
        "bulk_flush_interval" : "1s",
        "index" : "BoostMyShop",
        "type" : "ma_table",
        "sql" : "SELECT id as _id, name FROM ma_table"
    }
}'

Ma rivière est bien créée, et je peux voir depuis les logs Elasticsearch mon index s’alimenter. Un petit coup d’oeil sur head, mon type est bien présent, et le nombre de documents indexés correspond.

Facile non ?

Finalement, non. Car je commence mes tests, un insert par ci par là, un update et finalement rien ne se passe du côté de ma rivière … Après quelques recherches sur mon ami Google, j’apprends que les update ou/et insert ne sont pas être pris en compte par le plugin jdbc-river.

En y réfléchissant, il est à mon avis impossible de définir quelles lignes d’une ou plusieurs tables ont été modifiées depuis le dernier passage de la rivière, sans nécessité une charge importante de travail (binlog ? trigger ?). Bien entendu en prenant en compte que la requête d’alimentation, peut contenir plusieurs jointures.

Première piste

Après quelques recherche, je tombe sur cet article pulling-a-large-mysql-table-into-elasticsearch.  Il explique comment mettre en place un système d’alimentation incrémental, avec quelques pré requis, donc un qui me gêne particulièrement => la table ne doit accepter que des insert, pas d’update !!

Le principe est simple, il crée une table MySQL contenant des meta données associés à sa rivière, sur les quels il va s’appuyer pour mettre à jour son index. Pour cela il va falloir coder pas loin d’une centaine de lignes, j’ai beau être développeur et aimer mon métier, c’est trop pour moi :p

Mais cet article me fait tout de même avancer, car l’idée d’utiliser une table de meta données me plaît bien. Il me faut maintenant trouver une solution pour la maintenir à jour sans avoir besoin de coder.

La solution

Ma première idée était donc de stocker, le dernier passage de ma rivière dans une table MySQL afin que cette dernière s’appuie sur ces informations pour ne récupérer que les enregistrements insérées ou modifiées. Il est possible depuis peu avec le plugin jdbc-river d’exécuter plusieurs requêtes de manière séquentielle, je compte donc me servir de cette fonctionnalité pour maintenir ma table à jour.

Ci-dessous la structure de ma table:

CREATE TABLE river_meta (
    id MEDIUMINT unsigned NOT NULL AUTO_INCREMENT,
    name VARCHAR(60) NOT NULL,
    last_update VARCHAR(30) DEFAULT 0,
    next_last_update VARCHAR(30) DEFAULT 0,
    PRIMARY KEY(id)
);

Et ma rivière:

curl -XPUT 'xx.xx.xx.xx:9200/_river/ma_riviere/_meta' -d '{
    "type" : "jdbc",
    "jdbc" : {
        "url" : "jdbc:mysql://xx.xx.xx.xx:3306/mdn_mp",
        "user" : "elasticsearch",
        "password" : "je_suis_pas_fou",
        "bulk_size" : 2000,
        "max_bulk_requests" : 10,
        "bulk_flush_interval" : "1s",
        "index" : "BoostMyShop",
        "type" : "ma_table",
        "sql" : [
            {
                "statement" : "UPDATE river_meta SET next_last_update = (SELECT MAX(id) FROM ma_table) WHERE name = \"ma_riviere\""
            },
            {
                "statement" : "SELECT id as _id, name FROM ma_table WHERE id > (SELECT last_update FROM river_meta WHERE name = \"ma_riviere\")"
            },
            {
                "statement" : "UPDATE river_meta SET last_update = next_last_update WHERE name = \"ma_riviere\""
            }
        ]
    }
}'

Les 3 requêtes sont assez simple à comprendre. Dans un premier temps je stocke le plus grand id (ou date de modification) inséré dans ma_table dans le champ next_last_update. Ensuite je récupère l’ensemble des documents contenant un id ou une date supérieur à celui de ma dernière récupération (vous pouvez ajouter <= à next_last_update). Pour finir je mets à jour le champ contenant le dernier id récupéré, en tant que base pour le prochain passage.

Création de ma rivière, premier passage bam !! Les requêtes d’écritures ne sont pas acceptées …

Je vais donc devoir procéder autrement. Je pense assez rapidement à passer par des procédures stockées, je mets donc en place mes 2 procédures:

delimiter |
DROP PROCEDURE begin_river|
CREATE PROCEDURE begin_river (IN river_name VARCHAR(60), IN river_table VARCHAR(60), IN river_column VARCHAR(60))
BEGIN
    SET @s = CONCAT('UPDATE river_meta SET next_last_update = (SELECT MAX(', river_column, ') FROM ', river_table, ') WHERE name = "', river_name,'"');
    PREPARE stmt FROM @s;
    EXECUTE stmt;
    DEALLOCATE PREPARE stmt;
END|
delimiter ;

delimiter |
DROP PROCEDURE end_river|
CREATE PROCEDURE end_river (IN river_name VARCHAR(60))
BEGIN
    SET @s = CONCAT('UPDATE river_meta SET last_update = next_last_update WHERE name = "', river_name,'"');
    PREPARE stmt FROM @s;
    EXECUTE stmt;
    DEALLOCATE PREPARE stmt;
END|
delimiter ;

Je modifie ma rivière en conséquence:

curl -XPUT 'xx.xx.xx.xx:9200/_river/ma_riviere/_meta' -d '{
    "type" : "jdbc",
    "jdbc" : {
        "url" : "jdbc:mysql://xx.xx.xx.xx:3306/mdn_mp",
        "user" : "elasticsearch",
        "password" : "je_suis_pas_fou",
        "bulk_size" : 2000,
        "max_bulk_requests" : 10,
        "bulk_flush_interval" : "1s",
        "index" : "BoostMyShop",
        "type" : "ma_table",
        "sql" : [
            {
                "statement" : "CALL begin_river(\"ma_riviere\", \"ma_table\", \"id\")"
            },
            {
                "statement" : "SELECT id as _id, name FROM ma_table WHERE id > (SELECT last_update FROM river_meta WHERE name = \"ma_riviere\")"
            },
            {
                "statement" : "CALL end_river(\"ma_riviere\")"
            }
        ]
    }
}'

 

Et voilà, j’ai maintenant un système pour prendre en compte les nouvelles insertions et modifications. J’ai volontairement mis en VARCHAR le champ last_update. Je ne pense pas que ça soit une nécessité de se pencher plus que ça sur la structure de la table, car elle ne dépassera pas les 10000 rivières et donc 10000 enregistrements !

Il y a certainement plusieurs façons d’améliorer ce processus, j’ai plutôt codé ça en mode POC que mode projet. Ca fonctionne évidemment aussi avec un timestamp correspondant à la date de dernière modification, ce qui permet de mettre à jour les enregistrements modifiés depuis le dernier passage.

  • alain

    très utile merci