网上有许多elasticsearch和mongodb之间通过elasticsearch的mongodb-river插件建立管道的文章,这些文章都有一个要求:需要一个mongodb的集群,也就是需要至少两个mongodb的实例。原因也好理解:river插件是通过读取mongodb的oplog.rs这个表来同步mongodb中的更新的,而要oplog.rs只有在replicset中才能生效。所以本文的重点也就变成了如何在一个mongodb的实例下搭建replicset环境了。
?
总之,三个步骤:
- 搭建单机replicSet
- 安装mongodb-river插件
- 验证
搭建replicSet
这个问题其实网上已经有现成的方案了,参见:http://loosexaml.wordpress.com/2012/09/03/how-to-get-a-mongodb-oplog-without-a-full-replica-set/
不能访问这个地址的,我这里摘录关键的两步:
1、配置/etc/mongodb.conf
增加两个配置:
?
replSet=rs0 #这里是指定replSet的名字 oplogSize=100 #这里是指定oplog表数据大小(太大了不支持)
?2 初始化replicSet
?
?
rs.initiate( {"_id" : "rs0", "version" : 1, "members" : [ { "_id" : 0, "host" : "127.0.0.1:27017" } ]})
?
?
我在做这个设置的时候,碰到了点问题:
?
com.mongodb.ConnectionStatus$UpdatableNode update WARNING: Server seen down: /127.0.1.1:27017 - java.io.IOException - message: couldn't connect to [/127.0.1.1:27017] bc:java.net.ConnectException: Connection refused
?但是从mongo shell上和程序代码上都能连接到mongodb,不知道具体原因,换了最新的mongodb 2.2.3问题解决(没有更换任何配置文件和数据文件)。
?
?
结果
搭建好replicSet之后,在mongo shell中的提示符会变成:
?
rs0:PRIMARY>
?
?
安装 mongodb-river插件
插件在此:
https://github.com/richardwilly98/elasticsearch-river-mongodb
基本上可以分为两步:
- 准备jar包
- 创建meta信息
1、两个必须的jar包(elasticsearch-river-mongodb-1.6.2-SNAPSHOT.jar和mongo-java-driver-2.10.1.jar)放到es_home/plugins/mongodb_river
其中mongodb_river是自定义的一个名字。
?
2、创建meta信息
?
?
curl -XPUT "localhost:9400/_river/mongodb_application/_meta" -d ' { "type": "mongodb", "mongodb": { "host": "localhost", "port": "27017", "db": "application", "collection": "page" }, "index": { "name": "application", "type": "page"} } '
?主要分为三个部分:
type:river的类型,也就是“mongodb”
mongodb:mongodb的连接信息
index:elastisearch中用于接收mongodb数据的index和“type”。
?
验证结果
?
curl "http://localhost:9400/_river/mongodb_application/_meta"
?
?
验证
在mongo里插入一条数据,在对应的elasticsearch里就会有相应的数据。
?
一个问题
在river建立之后的数据变动会体现在elasticsearh里,但是river建立前的数据变动因为没有在oplog表里,所以不能被同步。我的解决方案是,遍历一次需要导出的表,重新插入到另外一个表里,然后将river指定到这个新表,这样新表的变动就可以全部体现在oplog里了。
遍历mongodb的表可以通过cursor来实现:
?
var myCursor = db.page.find( { }, {html:0} ); myCursor.forEach(function(myDoc) {db.application.save(myDoc); });
其中page是老表,application是新表(database名字也叫application,不要弄错)
?
?
?
?