Wenn eine neue Datei in S3 ankommt, luigi Aufgabe auslösen

Ich habe einen Eimer mit neuen Objekten, die in zufälligen Intervallen mit Schlüsseln auf der Grundlage ihrer Zeit der Schaffung hinzugefügt werden. Beispielsweise:

's3://my-bucket/mass/%s/%s/%s/%s/%s_%s.csv' % (time.strftime('%Y'), time.strftime('%m'), time.strftime('%d'), time.strftime('%H'), name, the_time) 

In der Tat, das sind die Ausgänge der Scrapy-Crawls. Ich möchte eine Aufgabe auslösen, die diesen Crawls entspricht, zu einem Master .csv Produktkatalogdatei Ich habe (nennen Sie es "product_catalog.csv"), das auch regelmäßig aktualisiert wird.

Im Moment habe ich mehrere Python-Skripte, die ich mit globalen Variablen geschrieben habe, die ich jedes Mal, wenn ich diesen Prozess ausführe, fülle. Diese müssen importierte Attribute werden.

Also hier ist was passieren muss

1) Neue csv-Datei erscheint in "s3: // my-bucket / mass / …" mit einem eindeutigen Schlüsselnamen auf der Grundlage der Zeit, die der Crawling abgeschlossen hat. Luigi sieht das und fängt an.
2) "cleaning.py" wird von luigi auf der neuen Datei ausgeführt, so dass der Parameter von "cleaning.py" (die Datei, die in S3 auftauchte) zur Laufzeit geliefert werden muss. Die Ergebnisse werden in S3 gespeichert, zusätzlich zum Weitergabe an den nächsten Schritt.
3) Die neueste Version von "product_catalog.csv" wird aus einer Datenbank gezogen und verwendet die Ergebnisse von "cleaning.py" in "matching.py"

Ich weiß, das kann nicht ganz sinnvoll sein. Ich werde ggf. Änderungen vornehmen, um alles klarer zu machen.

BEARBEITEN

Basierend auf anfänglichen Antworten habe ich dies als eine Pull-Operation konfiguriert, die Schritte auf dem Weg spart. Aber jetzt bin ich ziemlich verloren Es sollte beachtet werden, dass dies mein erstes Mal ein Python-Projekt zusammenbindet, also gibt es Dinge wie einschließlich init .py, dass ich lerne, wie ich das mache. Wie üblich, ist es eine holprige Straße der Aufregung von den Erfolgen gefolgt sofort durch Verwirrung an der nächsten Straßensperre.

Hier sind meine Fragen:
1) Wie man die Spinnen von Scrapy importiert, ist mir unklar. Ich habe etwa ein Dutzend von ihnen und das Ziel ist es, luigi verwalten den Prozess der Crawling> clean> Match für alle von ihnen. Die Scrapy-Dokumentation soll Folgendes enthalten:

 class MySpider(scrapy.Spider): # Your spider definition 

Was bedeutet das? Schreiben Sie die Spinne in das Skript, das die Spinne kontrolliert? Das macht keinen Sinn und ihre Beispiele sind nicht hilfreich.

2) Ich habe Scrapy Pipelines konfiguriert, um nach S3 zu exportieren, aber luigi scheint auch dies mit output () zu tun. Was soll ich benutzen und wie bekomme ich sie zusammen zu spielen?

3) Luigi sagt, dass CrawlTask ​​() erfolgreich gelaufen ist, aber das ist falsch, weil es in Sekunden abschließt und die Crawls in der Regel ein paar Minuten dauern. Es gibt auch keine Ausgabedatei, die dem Erfolg entspricht.

4) Wo stelle ich die Anmeldeinformationen für S3 zur Verfügung?

Hier ist mein Code. Ich habe Dinge kommentiert, die nicht anstelle dessen arbeiten, was ich als besser finde. Aber mein Sinn ist, dass es eine großartige Architektur gibt, was ich tun möchte, die ich gerade noch nicht verstehe.

 import luigi from luigi.s3 import S3Target, S3Client import my_matching from datetime import datetime import os import scrapy from twisted.internet import reactor from scrapy.crawler import CrawlerProcess from scrapy.utils.project import get_project_settings from my_crawlers.my_crawlers.spiders import my_spider class CrawlTask(luigi.Task): crawltime = datetime.now() spider = luigi.Parameter() #vertical = luigi.Parameter() def requires(self): pass def output(self): return luigi.LocalTarget("actual_data_staging/crawl_luigi_test_{}.csv".format(self.crawltime)) #return S3Target("s3://my-bucket/mass/crawl_luigi_test_{}.csv".format(self.crawltime)) def run(self): os.system("scrapy crawl %s" % self.spider) #process = CrawlerProcess(get_project_settings()) #process.crawl("%s" % self.spider) #process.start() class FetchPC(luigi.Task): vertical = luigi.Parameter() def output(self): if self.vertical == "product1": return "actual_data_staging/product1_catalog.csv" elif self.vertical == "product2": return "actual_data_staging/product2_catalog.csv" class MatchTask(luigi.Task): crawltime = CrawlTask.crawltime vertical = luigi.Parameter() spider = luigi.Parameter() def requires(self): return CrawlTask(spider=self.spider) return FetchPC(vertical=self.vertical) def output(self): return luigi.LocalTarget("actual_data_staging/crawl_luigi_test_matched_{}.csv".format(self.crawltime)) #return S3Target("s3://my-bucket/mass/crawl_luigi_test_matched_{}.csv".format(CrawlTask.crawltime)) def run(self): if self.vertical == 'product1': switch_board(requires.CrawlTask(), requires.FetchPC()) 

Die MatchTask bezieht sich auf ein Python-Skript, das ich geschrieben habe, das die geschabten Produkte mit meinem Produktkatalog vergleicht. Es sieht aus wie das:

 def create_search(value): ... def clean_column(column): ... def color_false_positive(): ... def switch_board(scrape, product_catalog): # this function coordinates the whole script 

2 Solutions collect form web for “Wenn eine neue Datei in S3 ankommt, luigi Aufgabe auslösen”

Unten ist ein sehr grober Umriss, wie es aussehen könnte. Ich denke, der Hauptunterschied von Ihrem Umriss in Bezug auf luigi Arbeit als Pull-System ist, dass Sie die Ausgabe, die Sie wollen zuerst, die dann löst die anderen Aufgaben, auf denen diese Ausgabe hängt ab. Also, anstatt die Dinge mit der Zeit zu benennen, die das Crawling beendet, ist es einfacher, die Dinge nach etwas zu nennen, was du am Anfang kennst. Es ist möglich, es anders zu machen, nur eine Menge unnötige Komplikationen.

 class CrawlTask(luigi.Task): crawltime = luigi.DateParameter() def requires(self): pass def get_filename(self): return "s3://my-bucket/crawl_{}.csv".format(self.crawltime) def output(self): return S3Target(self.get_filename()) def run(self): perform_crawl(s3_filename=self.get_filename()) class CleanTask(luigi.Task): crawltime = luigi.DateParameter() def requires(self): return CrawlTask(crawltime=self.crawltime) def get_filename(self): return "s3://my-bucket/clean_crawl_{}.csv".format(self.crawltime) def output(self): return S3Target(self.get_filename()) def run(self): perform_clean(input_file=self.input().path, output_filename=self.get_filename()) class MatchTask(luigi.Task): crawltime = luigi.DateParameter() def requires(self): return CleanTask(crawltime=self.crawltime) def output(self): return ##?? whatever output of this task is def run(self): perform_match(input_file=self.input().path) 

Was Sie tun können, ist ein größeres System zu erstellen, das sowohl Ihre Crawls als auch die Verarbeitung kapselt. Auf diese Weise müssen Sie s3 nicht auf neue Objekte prüfen. Ich habe Luigi noch nicht benutzt, aber vielleicht kannst du deinen kratzigen Job in eine Aufgabe verwandeln, und wenn es fertig ist, verarbeite deine Aufgabe. Wie auch immer, ich glaube nicht, dass 'Check' s3 für neue Sachen ist eine gute Idee, weil 1. Sie müssen viele API-Anrufe zu verwenden, und 2. Sie müssen eine Reihe von Code zu schreiben, ob etwas ist 'neu 'Oder nicht, was könnte haarig werden.

Python ist die beste Programmiersprache der Welt.