腾讯云EMR&Elasticsearch中使用ES-Hadoop之MR&Hive篇
腾讯云EMR&Elasticsearch中使用ES-Hadoop之Spark篇
Hadoop/Spark读写ES之性能调优
在上一篇中,我们介绍了在Hadoop和hive中做ES数据的导入导出。本篇我们介绍在Spark下使用ES-Hadoop的例子
*注:资源准备、数据准备以及ES-Hadoop关键配置项说明请参考上一篇中的内容
Spark 读取 ES 数据
import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaSparkContext; import org.elasticsearch.spark.rdd.api.java.JavaEsSpark;
import java.util.Map;
public class ReadFromESBySpark {
public static void main(String[] args) { SparkConf conf = new SparkConf().setAppName("my-app").clone() .set("es.nodes", "10.0.4.17") .set("es.port", "9200") .set("es.nodes.wan.only", "true") .set("es.input.use.sliced.partitions", "false") .set("es.input.max.docs.per.partition", "100000000"); JavaSparkContext sc = new JavaSparkContext(conf); JavaPairRDD<String, Map<String, Object>> rdd = JavaEsSpark.esRDD(sc, "logs-201998/type", "?q=clientip:247.37.0.0"); for (Map<String, Object> item : rdd.values().collect()) { System.out.println(item); } sc.stop(); }
}
通过JavaEsSpark.esRDD(sc, "logs-201998/type", "?q=clientip:247.37.0.0")
方法从ES集群的索引logs-201998/type
中,查询query为?q=clientip:247.37.0.0
,返回JavaPairRDD
。
通过 Spark RDD 写入 ES
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.spark_project.guava.collect.ImmutableList;
import org.spark_project.guava.collect.ImmutableMap;
import org.elasticsearch.spark.rdd.api.java.JavaEsSpark;import java.util.Map;
import java.util.List;public class WriteToESUseRDD {
public static void main(String[] args) { SparkConf conf = new SparkConf().setAppName("my-app").clone() .set("es.nodes", "10.0.4.17") .set("es.port", "9200") .set("es.nodes.wan.only", "true") .set("es.batch.size.bytes", "30MB") .set("es.batch.size.entries", "20000") .set("es.batch.write.refresh", "false") .set("es.batch.write.retry.count", "50") .set("es.batch.write.retry.wait", "500s") .set("es.http.timeout", "5m") .set("es.http.retries", "50") set("es.action.heart.beat.lead", "50s"); JavaSparkContext sc = new JavaSparkContext(conf); Map<String, ?> logs = ImmutableMap.of("clientip", "255.255.255.254", "request", "POST /write/using_spark_rdd HTTP/1.1", "status", 200,"size", 802, "@timestamp", 895435190); List<Map<String, ?>> list = ImmutableList.of(logs); JavaRDD<Map<String, ?>> javaRDD = sc.parallelize(list); JavaEsSpark.saveToEs(javaRDD, "logs-201998/type"); sc.stop(); }
}
构建JavaRDD
,通过JavaEsSpark.saveToEs
写入。
通过 Spark Streaming 写入 ES
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.Seconds;import org.elasticsearch.spark.streaming.api.java.JavaEsSparkStreaming;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.spark_project.guava.collect.ImmutableList;
import org.spark_project.guava.collect.ImmutableMap;import java.util.Map;
import java.util.LinkedList;
import java.util.Queue;public class WriteToESUseSparkStreaming {
public static void main(String[] args) { SparkConf conf = new SparkConf().setAppName("my-app").clone() .set("es.nodes", "10.0.4.17") .set("es.port", "9200") .set("es.nodes.wan.only","true") .set("es.batch.size.bytes", "30MB") .set("es.batch.size.entries", "20000") .set("es.batch.write.refresh", "false") .set("es.batch.write.retry.count", "50") .set("es.batch.write.retry.wait", "500s") .set("es.http.timeout", "5m") .set("es.http.retries", "50") set("es.action.heart.beat.lead", "50s"); JavaSparkContext sc = new JavaSparkContext(conf); JavaStreamingContext jssc = new JavaStreamingContext(sc, Seconds.apply(1)); Map<String, ?> logs = ImmutableMap.of("clientip", "255.255.255.253", "request", "POST /write/using_spark_streaming HTTP/1.1"); JavaRDD<Map<String, ?>> javaRDD = sc.parallelize(ImmutableList.of(logs)); Queue<JavaRDD<Map<String, ?>>> microbatches = new LinkedList<>(); microbatches.add(javaRDD); JavaDStream<Map<String, ?>> javaDStream = jssc.queueStream(microbatches); JavaEsSparkStreaming.saveToEs(javaDStream, "logs-201998/type"); sc.stop(); }
}
构建JavaRDD
和JavaDStream
,通过调用JavaEsSparkStreaming.saveToEs
写入。
执行
wget http://central.maven.org/maven2/org/elasticsearch/elasticsearch-spark-20_2.11/5.6.4/elasticsearch-spark-20_2.11-5.6.4.jar
spark-submit --jars elasticsearch-spark-20_2.11-5.6.4.jar --class "ReadFromESBySpark" esspark-1.0-SNAPSHOT.jar
通过--jars
参数,载入elasticsearch-spark
总结
相比于Hadoop,Spark与ES的交互有更多的方式,包括RDD,Spark Streaming,还有文中未涉及到的DataSet与Spark SQL的模式等等。本位未列出scale版的相关代码,可以参考Elastic官方文档进行实际的演练。