PDFs mit Solr und Symfony indexieren

Das Indexieren von Daten um später besser danach suchen zu können, ist mit Hilfe von Apache Solr und einen geeigneten API-Client sehr einfach geworden.

In diesen Post möchte ich am Anfang darauf eingehen wie man generell Dateien mit Solr indexiert. Im zweiten Teil gehe ich konkreter drauf ein wie alles in ein Symfony2/3 Projekt integrieren kann und die eigentliche Suche aussieht.

PDF-Indexierung mit Solr

Solr wird seit Version 5 mit dem bin/post Kommandozeilen-Tool ausgeliefert, womit es möglich ist die gängigsten Dateiformate zu indexieren.

Zunächst muss ein sogenannter RequestHandler und der Pfad zu den Parser-Libs in der solrconfig.xml registriert werden:

<requestHandler name="/update/extract"
                  startup="lazy"
                  class="solr.extraction.ExtractingRequestHandler" >
    <lst name="defaults">
      <str name="lowernames">true</str>
      <str name="uprefix">ignored_</str>

      <!-- capture link hrefs but ignore div attributes -->
      <str name="captureAttr">true</str>
      <str name="fmap.a">links</str>
      <str name="fmap.div">ignored_</str>
    </lst>
  </requestHandler>
  <lib dir="/opt/solr/dist" />
  <lib dir="/opt/solr/contrib/extraction/lib" />

Nun kann die Indexierung gestartet werden:

bin/post -c core0 /path/to/file.pdf -params "literal.id=1&uprefix=attr_"

Zur Erklärung: -c core0 gibt an in welcher collection das Dokument abgelegt werden soll, in diesen Fall ist es core0. Der zweite Parameter gibt den Pfad zur einer Datei oder einen Verzeichnis mit den Dateien an die indexiert werden sollen. Als dritter Parameter wird hier -params übergeben der die Parameter direkt an den jeweiligen Parser weitergibt. Mit literal.id wird die Id des neuen Dokumentes bestimmt. Wenn das Dokument weitere Felder haben soll können diese ebenfalls übergeben werden: literal.id=1&literal.field_name=field_value. Außerdem erhalten alle Felder den Präfix attr_. Sobald ein Feld den attr_ Präfix hat wird es als Feld vom Typ Text erkannt.

Wenn der Indexierung erfolgreich war befindet sich ein neues Dokument mit der id 1 und diversen Feldern die Tika generiert im Index.

Integration in Symfony

Um einen möglichen Dateiupload näher darzustellen möchte ich hierfür auf einen Artikel der Symfony Dokumentation verweisen. Dort wird ein Dateiupload für ein Produkt implementiert.

Da in dem Beispiel der Upload an ein Produkt gebunden ist macht es Sinn diese Information ebenfalls zu indexieren, um später eine Zuordnung zum Produkt vorzunehmen. Hierfür wird dem -param Parameter eine weitere Option übergeben: -param „literal.product_id=1“. Das erzeugt Dokument besitzt dadurch ein attr_product_id Feld .

Letztere Möglichkeit macht am meistens Sinn.

use Symfony\Component\Process\Process;

class ProductController extends Controller
{
    public function newAction(Request $request)
    {
        // ...
        $filePath = $this->getParameter('brochures_directory') . '/' . $fileName;
        $process = new Process(sprintf('/opt/solr/bin/post -c core0 %s -params "literal.product_id=%s&uprefix=attr_"', $filePath, $product->getId()));
        $process->setTimeout(360000);
        $process->run(function ($type, $buffer) {
            if (Process::ERR === $type) {
                $this->get('logger')->error($buffer);
            } else {
                $this->get('logger')->debug($buffer);
            }
        });
    }
}

Die Indexierung wird gestartet in dem ein Prozess mit langer Laufzeit ausgeführt wird. Sollte ein Fehler auftreten wird ein Logeintrag erstellt. Sonstiger Output wird als Debug-Message ebenfalls geloggt. Neben der Id und dem product_id Feld werden diverse weitere Metadaten indexiert: Autor, Erstellungsdatum, Dateigröße und der komplette Inhalt des PDFs.

Suche nach Dokumenten

Für die Suche bietet es sich an Solarium als Client zu verwenden.

$client = new Solarium\Client($config);
$selectQuery = $client->createSelect();
$selectQuery->setFields(array('attr_product_id'));
$selectQuery->setQuery(sprintf('attr_content:*%1$s* OR attr_content:%1$s', $search, $search));

$pdfs = $client->select($selectQuery);

In dem Select wird im Feld attr_content, das den Inhalt des PDFs beinhaltet, nach einen String gesucht. Da die meisten Felder des Dokuments ehr uninteressant sind, soll nur das Feld product_id selektiert werden.

Diese Suche hat noch eine Limitierung: Groß/Kleinschreibung. Enthält das indexierte PDF das Wort „Symfony“ und sucht der Nutzer nach „symfony“ wird kein Dokument gefunden. Um das Problem zu lösen muss ein neuer Feldtypen mit LowerCaseFilter  erstellt werden. Dieser ersetzt alle Großbuchstaben durch Kleinbuchstaben zur Laufzeit der Suchanfrage.

Zuerst muss ein neuer Feldtyp erstellt werden:

<fieldType name="text_de" class="solr.TextField" positionIncrementGap="100">
  <analyzer> 
    <tokenizer class="solr.StandardTokenizerFactory"/>
    <filter class="solr.LowerCaseFilterFactory"/>
    <filter class="solr.GermanNormalizationFilterFactory"/>
    <filter class="solr.GermanLightStemFilterFactory"/>
  </analyzer>
</fieldType>

Neben dem LowerCaseFilter werden noch zwei weitere Filter registriert, mit denen die Suche nach „Früchte“ auch Ergebnisse liefert wenn Dokumente das Wort „Frucht“ enthalten.

Ist das erledigt muss ein neues Feld erstellt werden mit dem vorher erstellten Feldtypen.

<field name="attr_content" type="text_de" indexed="true" stored="true"/>

Ist das erledigt kann Solr neugestartet werden.

Hinweise: in der solrconfig.xml muss

<schemaFactory class="ManagedIndexSchemaFactory">
  <bool name="mutable">true</bool>
  <str name="managedSchemaResourceName">managed-schema</str>
</schemaFactory>

durch

<schemaFactory class="ClassicIndexSchemaFactory"/>

ersetzt werden sonst wird die schema.xml Datei nicht verwendet und es gibt Fehler beim Start von Solr.

Refactoring

Um die Controller-Action möglichst schlank zu halten, sollte die Prozessausführung in irgendeiner Art gekapselt werden. Für diesen speziellen Fall bietet sich eine Queue an die sequenziell abgearbeitet wird. Um den Konfigurationsaufwand so gering wie möglich zu halten, soll hier das QPush-Bundle verwendet werden das eine einfache Queue mitbringt.

Das Bundle muss mit Composer installiert und später im AppKernel registriert werden. Als nächstes muss eine Queue erstellt werden, in der die Messages abgelegt werden können:

uecode_qpush:
   providers:
       product_upload_file_based:
           driver: file
           path: "%kernel.root_dir%/Resource/queue/product_upload"

   queues:
       product_upload:
           provider: product_upload_file_based
           options:
               message_delay: 0 # message kann sofort bearbeitet werden
               message_expiration: 604800 # eine Woche wird die Message gespeichert

Zuerst wird ein neuer Provider erstellt der die Messages im Dateisystem ablegt. Dieser wird anschließend von der Queue product_upload benutzt.  Als nächstes muss eine neue Message auf die Queue gelegt werden:

use Symfony\Component\Process\Process;

class ProductController extends Controller
{
    public function newAction(Request $request)
    {
        // ...
        $this->get('uecode_qpush.product_upload')->publish(array(
            'product-id' => $product->getId(),
        ));
    }
}

Im vorher konfigurieren Verzeichnis ist nun eine JSON-Datei zu finden welche die Produkt-Id enthält. Um die Queue abzuarbeiten muss ein Command der den Namen der Queue erwartet aufgerufen werden, der ein message_received Event feuert:

./bin/console uecode:qpush:receive product_upload

Die Servicedefinition des Eventhandler hat keine großen Besonderheiten. Zu beachten ist allerdings das der Eventname den Namen der Queue beinhaltet:

services:
    app.listener.product.index_product_brochure_listener:
    class: AppBundle\Listener\Product\IndexProductBrochureListener
    arguments:[ "@logger", "@doctrine.orm.entity_manager", "%brochure_directory%" ]
    tags:
        - { name: uecode_qpush.event_listener, event: product_upload.message_received, method: onUpload }

Und der Listener dazu:

<?php

namespace AppBundle\Listener\Product;

use AppBundle\Entity\Product;
use Doctrine\ORM\EntityManagerInterface;
use Psr\Log\LoggerInterface;
use Symfony\Component\Process\Process;
use Uecode\Bundle\QPushBundle\Event\MessageEvent;

class IndexProductBrochureListener
{
    /**
     * @var LoggerInterface
     */
    private $logger;

    /**
     * @var EntityManagerInterface
     */
    private $entityManager;

    /**
     * @var string
     */
    private $downloadsDir;

    /**
     * @param LoggerInterface        $logger
     * @param EntityManagerInterface $entityManager
     * @param string                 $downloadsDir
     */
    public function __construct(LoggerInterface $logger, EntityManagerInterface $entityManager, $brochuresDirectory)
    {
        $this->logger = $logger;
        $this->entityManager = $entityManager;
        $this->brochuresDirectory = $brochuresDirectory;
    }

    /**
     * @param MessageEvent $event
     */
    public function onUpload(MessageEvent $event)
    {
        $messageBody = $event->getMessage()->getBody();
        $productId = $messageBody['product-id'];

        $product = $this->entityManager->getRepository(Product::class)->find($productId);
        $path = $this->brochuresDirectory . '/' . $product->getBrochure();

        $process = new Process(sprintf('/opt/solr/bin/post -c core0 %s -params "literal.product_id=%s&uprefix=attr_"', $path, $product->getId()));
        $process->setTimeout(360000);
        $process->run(function ($type, $buffer) {
            if (Process::ERR === $type) {
                $this->logger->error($buffer);
            } else {
                $this->logger->debug($buffer);
            }
        });
    }

}