問題

ファイルがそのディレクトリに表示されるとすぐにspark.readStreamを使用して、ディレクトリを連続的に監視し、CSVファイルを読み取るようにします。

Spark Streamingのソリューションを含めないでください。私はスパーク構造ストリーミングを使ってそれを行う方法を探しています。

  ベストアンサー

このユースケースの完全なソリューションは次のとおりです。

スタンドアローンモードで動作している場合は、次のようにドライバのメモリを増やすことができます。

 bin/spark-shell --driver-memory 4G
 

ドライバ内で Stand Alone mode executor のように executor メモリを設定する必要はありません。

@T.Gawedaのソリューションを完成させると、以下の解決策を見つける:

 val userSchema = new StructType().add("name", "string").add("age", "integer")
val csvDF = spark
  .readStream
  .option("sep", ";")
  .schema(userSchema)      // Specify schema of the csv files
  .csv("/path/to/directory")    // Equivalent to format("csv").load("/path/to/directory")

csvDf.writeStream.format("console").option("truncate","false").start()
 

これで、sparkは指定されたディレクトリを継続的に監視し、ディレクトリにcsvファイルを追加するとすぐにDataFrame操作 "csvDF"がそのファイルで実行されます。

注:スパークをインフェルスキーマにしたい場合は、まず次の設定を設定する必要があります。

 spark.sqlContext.setConf("spark.sql.streaming.schemaInferenc‌​e","true")
 

スパークはスパークセッションです。

  同じタグがついた質問を見る

scalaapache-sparkspark-structured-streaming