spark操作es需要elasticsearch-hadoop-xxx.jar,版本需同es版本。从5.0版本开始,支持spark2.0。
把elasticsearch-hadoop-xxx.jar放在spark的类路径(classpath)中。
1、Native RDD support 原生RDD支持
不学。
2、Spark SQL support
java代码示例:
public static void main(String[] args) { SparkSession spark = SparkSession.builder() .appName("heihei").master("local[*]") .getOrCreate(); Map cfg = new HashMap(8); cfg.put("es.nodes", "192.168.56.12"); cfg.put("es.port", "9200"); Datasetdf = spark.read().format("org.elasticsearch.spark.sql") .options(cfg) .load("twitter/_doc"); if (df.count() == 0) { df = spark.createDataFrame(Arrays.asList(new Person("3", "zhang san", "123456", "2018-11-17", "2018-11-17") , new Person("" + Integer.MAX_VALUE, "wang zhao jun", "123456", "2018-11-18", "2018-11-18") ), Person.class); } cfg.put("es.index.auto.create", "false"); cfg.put("es.mapping.id", "id"); cfg.put("es.internal.spark.sql.pushdown", "true"); df.write().format("org.elasticsearch.spark.sql").mode(SaveMode.Overwrite) .options(cfg) .save("twitter/_doc"); }
其中pushdown表示Whether to translate (push-down) Spark SQL into Elasticsearch Query DSL。
最后,在用save()输出时,如果不想把index/type当做save()方法的入参,还可以用option(“path”,"index/type")的方式指定。
3、Spark Streaming support