ES-Hadoop is a tool developed by open source Elasticsearch. It connects Elasticsearch to Apache Hadoop and enables data transmission between them. ES-Hadoop combines the quick search capability of Elasticsearch and the batch processing capability of Hadoop to achieve interactive data processing. For some complex data analytics tasks, you must run a MapReduce task to read data from the JSON files stored in Hadoop Distributed File System (HDFS) and write the data to an Elasticsearch cluster. This topic describes how to use ES-Hadoop to run such a MapReduce task.
ProcedureCreate an Alibaba Cloud Elasticsearch cluster and an E-MapReduce (EMR) cluster in the same virtual private cloud (VPC). Then, enable the Auto Indexing feature for the Elasticsearch cluster, and prepare test data and a Java environment.
Download the ES-Hadoop package and upload the package to the HDFS directory on the master node in the EMR cluster.
Create a Java Maven project and configure POM dependencies.
Compile the Java code that is used to write data to the Elasticsearch cluster. Compress the code into a JAR package and upload the package to the EMR cluster. Then, run the code in a MapReduce task to write data.
Log on to the Kibana console of the Elasticsearch cluster. Then, query the data that is written by the MapReduce task.
For more information, see
Create an Alibaba Cloud Elasticsearch clusterand
Enable the Auto Indexing feature. In this topic, an Elasticsearch V6.7.0 cluster is created.
Notice In a production environment, we recommend that you disable the Auto Indexing feature. You must create an index and configure mappings for the index in advance. The Elasticsearch cluster used in this topic is only for tests. Therefore, the Auto Indexing feature is enabled.
EMR cluster configuration:
For more information, see
Create a cluster.
NoticeBy default, 0.0.0.0/0 is specified in the private IP address whitelist of the Elasticsearch cluster. You can view the whitelist configuration on the cluster security configuration page. If the default setting is not used, you must add the private IP address of the EMR cluster to the whitelist.
The following test data is used in this topic:
{"id": 1, "name": "zhangsan", "birth": "1990-01-01", "addr": "No.969, wenyixi Rd, yuhang, hangzhou"}
{"id": 2, "name": "lisi", "birth": "1991-01-01", "addr": "No.556, xixi Rd, xihu, hangzhou"}
{"id": 3, "name": "wangwu", "birth": "1992-01-01", "addr": "No.699 wangshang Rd, binjiang, hangzhou"}
The elasticsearch-hadoop-6.7.0.zip package is used in this topic.
hadoop fs -mkdir /tmp/hadoop-es
hadoop fs -put elasticsearch-hadoop-6.7.0/dist/elasticsearch-hadoop-6.7.0.jar /tmp/hadoop-es
Create a Java Maven project and add the following POM dependencies to the pom.xml file of the project.
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>2.4.1</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<transformers>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>WriteToEsWithMR</mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>2.8.5</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-jobclient</artifactId>
<version>2.8.5</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.8.5</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-auth</artifactId>
<version>2.8.5</version>
</dependency>
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch-hadoop-mr</artifactId>
<version>6.7.0</version>
</dependency>
<dependency>
<groupId>commons-httpclient</groupId>
<artifactId>commons-httpclient</artifactId>
<version>3.1</version>
</dependency>
</dependencies>
Notice Make sure that the versions of POM dependencies are consistent with those of the related Alibaba Cloud services. For example, the version of elasticsearch-hadoop-mr is consistent with that of Alibaba Cloud Elasticsearch, and the version of hadoop-hdfs is consistent with that of HDFS.
Step 3: Compile code and run a MapReduce taskThe following code reads data from the JSON files in the
/tmp/hadoop-esdirectory of HDFS. The code also writes each row of data in these JSON files as a document to the Elasticsearch cluster. Data write is finished by EsOutputFormat in the map stage.
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.elasticsearch.hadoop.mr.EsOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class WriteToEsWithMR extends Configured implements Tool {
public static class EsMapper extends Mapper<Object, Text, NullWritable, Text> {
private Text doc = new Text();
@Override
protected void map(Object key, Text value, Context context) throws IOException, InterruptedException {
if (value.getLength() > 0) {
doc.set(value);
System.out.println(value);
context.write(NullWritable.get(), doc);
}
}
}
public int run(String[] args) throws Exception {
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
conf.setBoolean("mapreduce.map.speculative", false);
conf.setBoolean("mapreduce.reduce.speculative", false);
conf.set("es.nodes", "es-cn-4591jumei000u****.elasticsearch.aliyuncs.com");
conf.set("es.port","9200");
conf.set("es.net.http.auth.user", "elastic");
conf.set("es.net.http.auth.pass", "xxxxxx");
conf.set("es.nodes.wan.only", "true");
conf.set("es.nodes.discovery","false");
conf.set("es.input.use.sliced.partitions","false");
conf.set("es.resource", "maptest/_doc");
conf.set("es.input.json", "true");
Job job = Job.getInstance(conf);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(EsOutputFormat.class);
job.setMapOutputKeyClass(NullWritable.class);
job.setMapOutputValueClass(Text.class);
job.setJarByClass(WriteToEsWithMR.class);
job.setMapperClass(EsMapper.class);
FileInputFormat.setInputPaths(job, new Path(otherArgs[0]));
return job.waitForCompletion(true) ? 0 : 1;
}
public static void main(String[] args) throws Exception {
int ret = ToolRunner.run(new WriteToEsWithMR(), args);
System.exit(ret);
}
}
Table 1. ES-Hadoop parameters Parameter Default value Description es.nodes localhost The endpoint that is used to access the Elasticsearch cluster. We recommend that you use the internal endpoint. You can obtain the internal endpoint on the Basic Information page of the Elasticsearch cluster. For more information, see View the basic information of a cluster. es.port 9200 The port number that is used to access the Elasticsearch cluster. es.net.http.auth.user elastic The username that is used to access the Elasticsearch cluster. Note
If you use the elastic account to access your Elasticsearch cluster and then reset the password of the account, it may require some time for the new password to take effect. During this period, you cannot use the elastic account to access the cluster. Therefore, we recommend that you do not use the elastic account to access an Elasticsearch cluster. You can log on to the Kibana console and create a user with the required role to access an Elasticsearch cluster. For more information, see
Use the RBAC mechanism provided by Elasticsearch X-Pack to implement access control.
es.net.http.auth.pass / The password that is used to access the Elasticsearch cluster. es.nodes.wan.only false Specifies whether to enable node sniffing when the Elasticsearch cluster uses a virtual IP address for connections. Valid values:Notice If you use Alibaba Cloud Elasticsearch, you must set this parameter to false.
es.input.use.sliced.partitions true Specifies whether to use partitions. Valid values:For more information about the configuration items of ES-Hadoop, see open source ES-Hadoop configuration.
hadoop jar es-mapreduce-1.0-SNAPSHOT.jar /tmp/hadoop-es/map.json
Note Replace es-mapreduce-1.0-SNAPSHOT.jar with the name of the uploaded JAR file.
GET maptest/_search
{
"query": {
"match_all": {}
}
}
If the command is successfully run, the result shown in the following figure is returned.
This topic describes how to use ES-Hadoop to write data to Elasticsearch by running a MapReduce task in an EMR cluster. You can also run a MapReduce task to read data from Elasticsearch. The configurations for data read operations are similar to those for data write operations. For more information, see Reading data from Elasticsearch in open source Elasticsearch documentation.
Thank you! We've received your feedback.
RetroSearch is an open source project built by @garambo | Open a GitHub Issue
Search and Browse the WWW like it's 1997 | Search results from DuckDuckGo
HTML:
3.2
| Encoding:
UTF-8
| Version:
0.7.4