使用 GeoMesa Native API 向 accumulo 中插入数据
Using GeoMesa Native API to insert data in accumulo
我正在尝试使用 GeoMesa Native API 从 accumulo 存储中插入和读取数据。我已经创建了一个 class 文件来本地使用 geomesa accumulo 存储。这是我的 java 代码:
package org.locationtech.geomesa.api;
import com.google.common.base.Function;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.gson.Gson;
import com.vividsolutions.jts.geom.Coordinate;
import com.vividsolutions.jts.geom.Geometry;
import com.vividsolutions.jts.geom.GeometryFactory;
import org.apache.accumulo.core.client.Connector;
import org.apache.accumulo.core.client.mock.MockInstance;
import org.apache.accumulo.core.client.security.tokens.PasswordToken;
import org.apache.accumulo.core.security.Authorizations;
import org.geotools.factory.CommonFactoryFinder;
import org.geotools.feature.AttributeTypeBuilder;
import org.geotools.geometry.jts.JTSFactoryFinder;
import org.junit.Assert;
import org.junit.Test;
import org.locationtech.geomesa.accumulo.data.AccumuloDataStore;
import org.locationtech.geomesa.accumulo.index.AccumuloFeatureIndex;
import org.locationtech.geomesa.accumulo.index.AccumuloFeatureIndex$;
import org.locationtech.geomesa.utils.index.IndexMode$;
import org.opengis.feature.simple.SimpleFeature;
import org.opengis.feature.type.AttributeDescriptor;
import org.opengis.filter.FilterFactory2;
import javax.annotation.Nullable;
import java.time.ZonedDateTime;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.SortedSet;
import java.util.TreeSet;
public class WorkerBeta {
public static void main(String[] args){
try {
DomainObjectValueSerializer dovs = new DomainObjectValueSerializer();
final GeoMesaIndex<DomainObject> index = AccumuloGeoMesaIndex.buildWithView(
"aj_v14",
"localhost:2181",
"hps",
"root", "9869547580",
false,
dovs,
new SimpleFeatureView<DomainObject>() {
AttributeTypeBuilder atb = new AttributeTypeBuilder();
private List<AttributeDescriptor> attributeDescriptors =
Lists.newArrayList(atb.binding(Integer.class).buildDescriptor("rId")
, atb.binding(String.class).buildDescriptor("dId")
, atb.binding(Integer.class).buildDescriptor("s")
, atb.binding(Integer.class).buildDescriptor("a")
, atb.binding(Integer.class).buildDescriptor("e")
);
@Override
public void populate(SimpleFeature f, DomainObject domainObject, String id, byte[] payload, Geometry geom, Date dtg) {
f.setAttribute("rId", domainObject.rideId);
f.setAttribute("dId", domainObject.deviceId);
f.setAttribute("s", domainObject.speed);
f.setAttribute("a", domainObject.angle);
f.setAttribute("e", domainObject.error);
}
@Override
public List<AttributeDescriptor> getExtraAttributes() {
return attributeDescriptors;
}
}
);
//Inserting
final DomainObject one = new DomainObject(1, "AJJASsP", 12, 40, 1);
final GeometryFactory gf = JTSFactoryFinder.getGeometryFactory();
System.out.println(index.insert(
one,
gf.createPoint(new Coordinate(-74.0, 34.0)),
date("2017-03-31T01:15:00.000Z")
));
//Read
GeoMesaQuery q = GeoMesaQuery.GeoMesaQueryBuilder.builder()
.within(-90.0, -180, 90, 180)
.during(date("2017-01-01T00:00:00.000Z"), date("2017-04-01T00:00:00.000Z"))
.build();
Iterable<DomainObject> results = index.query(q);
int counter = 0;
for(DomainObject dm : results){
counter += 1;
System.out.println("result counter: " + counter);
dovs.toBytes(dm);
}
}
catch (Exception ex){
ex.printStackTrace();
}
}
public static class DomainObject {
public final int rideId;
public final String deviceId;
public final int angle;
public final int speed;
public final int error;
public DomainObject(int rideId, String deviceId, int angle, int speed, int error) {
this.rideId = rideId;
this.deviceId = deviceId;
this.angle = angle;
this.speed = speed;
this.error = error;
}
}
public static class DomainObjectValueSerializer implements ValueSerializer<DomainObject> {
public static final Gson gson = new Gson();
@Override
public byte[] toBytes(DomainObject o) {
return gson.toJson(o).getBytes();
}
@Override
public DomainObject fromBytes(byte[] bytes) {
return gson.fromJson(new String(bytes), DomainObject.class);
}
}
public static Date date(String s) {
return Date.from(ZonedDateTime.parse(s).toInstant());
}
}
命令的日志是:
suresh@hpss-MacBook-Air:~/GeomesaAccumuloNativeClient $ java -cp target/geomesa-native-api_2.11-1.3.2-SNAPSHOT.jar org.locationtech.geomesa.api.WorkerBeta
WARNING: org.apache.hadoop.metrics.jvm.EventCounter is deprecated. Please use org.apache.hadoop.log.metrics.EventCounter in all the log4j.properties files.
17/04/01 15:11:48 INFO zookeeper.ZooKeeper: Client environment:zookeeper.version=3.4.6-1569965, built on 02/20/2014 09:09 GMT
17/04/01 15:11:48 INFO zookeeper.ZooKeeper: Client environment:host.name=192.168.1.103
17/04/01 15:11:48 INFO zookeeper.ZooKeeper: Client environment:java.version=1.8.0_121
17/04/01 15:11:48 INFO zookeeper.ZooKeeper: Client environment:java.vendor=Oracle Corporation
17/04/01 15:11:48 INFO zookeeper.ZooKeeper: Client environment:java.home=/Library/Java/JavaVirtualMachines/jdk1.8.0_121.jdk/Contents/Home/jre
17/04/01 15:11:48 INFO zookeeper.ZooKeeper: Client environment:java.class.path=target/geomesa-native-api_2.11-1.3.2-SNAPSHOT.jar
17/04/01 15:11:48 INFO zookeeper.ZooKeeper: Client environment:java.library.path=/Users/suresh/Library/Java/Extensions:/Library/Java/Extensions:/Network/Library/Java/Extensions:/System/Library/Java/Extensions:/usr/lib/java:.
17/04/01 15:11:48 INFO zookeeper.ZooKeeper: Client environment:java.io.tmpdir=/var/folders/yk/h858t8h57nz42t6t4nqmwhcc0000gp/T/
17/04/01 15:11:48 INFO zookeeper.ZooKeeper: Client environment:java.compiler=<NA>
17/04/01 15:11:48 INFO zookeeper.ZooKeeper: Client environment:os.name=Mac OS X
17/04/01 15:11:48 INFO zookeeper.ZooKeeper: Client environment:os.arch=x86_64
17/04/01 15:11:48 INFO zookeeper.ZooKeeper: Client environment:os.version=10.12.3
17/04/01 15:11:48 INFO zookeeper.ZooKeeper: Client environment:user.name=suresh
17/04/01 15:11:48 INFO zookeeper.ZooKeeper: Client environment:user.home=/Users/suresh
17/04/01 15:11:48 INFO zookeeper.ZooKeeper: Client environment:user.dir=/Users/suresh/GeomesaAccumuloNativeClient
17/04/01 15:11:48 INFO zookeeper.ZooKeeper: Initiating client connection, connectString=localhost:2181 sessionTimeout=30000 watcher=org.apache.accumulo.fate.zookeeper.ZooSession$ZooWatcher@73eb439a
17/04/01 15:11:48 INFO zookeeper.ClientCnxn: Opening socket connection to server localhost/0:0:0:0:0:0:0:1:2181. Will not attempt to authenticate using SASL (unknown error)
17/04/01 15:11:48 INFO zookeeper.ClientCnxn: Socket connection established to localhost/0:0:0:0:0:0:0:1:2181, initiating session
17/04/01 15:11:48 INFO zookeeper.ClientCnxn: Session establishment complete on server localhost/0:0:0:0:0:0:0:1:2181, sessionid = 0x15aea0c41f601a1, negotiated timeout = 30000
17/04/01 15:11:52 WARN data.AccumuloDataStore: Configured server-side iterators do not match client version - client version: 1.3.2-SNAPSHOT, server version: 1.3.0
50fa12fb-11f8-4776-bb35-95b32da9225d
[]
但是当我尝试验证插入的记录时,我无法在 created 中找到任何与 accumulo 网络界面表中插入的数据相关的特定条目。这是累积表 的屏幕截图。如果我遗漏了什么,请纠正我。提前致谢。
两条速记:
您的类型没有名为 'dtg' 的字段,GeoMesaQuery 假设有一个。要轻松解决此问题,您可以使用 'GeoMesaQuery.GeoMesaQueryBuilder.builder().include().build()'。从长远来看,原生 api 可以使用一些改进来轻松流畅地执行您想要的操作。
要查看记录是否已写入 Accumulo,您可以使用 Accumulo shell 并扫描各个表。如果表中没有任何内容,可能值得调试此 code.
可能您的插入没有刷新到磁盘。 Accumulo 使用批处理写入器来提高性能——一旦其内部缓冲区已满,这将定期写入磁盘。由于您只插入一条记录,因此不会发生这种情况。
要修复,您可以在 GeoMesaIndex
实例上调用 close
。这会将所有现有记录刷新到磁盘。然后您需要实例化一个新实例来执行查询。
我正在尝试使用 GeoMesa Native API 从 accumulo 存储中插入和读取数据。我已经创建了一个 class 文件来本地使用 geomesa accumulo 存储。这是我的 java 代码:
package org.locationtech.geomesa.api;
import com.google.common.base.Function;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.gson.Gson;
import com.vividsolutions.jts.geom.Coordinate;
import com.vividsolutions.jts.geom.Geometry;
import com.vividsolutions.jts.geom.GeometryFactory;
import org.apache.accumulo.core.client.Connector;
import org.apache.accumulo.core.client.mock.MockInstance;
import org.apache.accumulo.core.client.security.tokens.PasswordToken;
import org.apache.accumulo.core.security.Authorizations;
import org.geotools.factory.CommonFactoryFinder;
import org.geotools.feature.AttributeTypeBuilder;
import org.geotools.geometry.jts.JTSFactoryFinder;
import org.junit.Assert;
import org.junit.Test;
import org.locationtech.geomesa.accumulo.data.AccumuloDataStore;
import org.locationtech.geomesa.accumulo.index.AccumuloFeatureIndex;
import org.locationtech.geomesa.accumulo.index.AccumuloFeatureIndex$;
import org.locationtech.geomesa.utils.index.IndexMode$;
import org.opengis.feature.simple.SimpleFeature;
import org.opengis.feature.type.AttributeDescriptor;
import org.opengis.filter.FilterFactory2;
import javax.annotation.Nullable;
import java.time.ZonedDateTime;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.SortedSet;
import java.util.TreeSet;
public class WorkerBeta {
public static void main(String[] args){
try {
DomainObjectValueSerializer dovs = new DomainObjectValueSerializer();
final GeoMesaIndex<DomainObject> index = AccumuloGeoMesaIndex.buildWithView(
"aj_v14",
"localhost:2181",
"hps",
"root", "9869547580",
false,
dovs,
new SimpleFeatureView<DomainObject>() {
AttributeTypeBuilder atb = new AttributeTypeBuilder();
private List<AttributeDescriptor> attributeDescriptors =
Lists.newArrayList(atb.binding(Integer.class).buildDescriptor("rId")
, atb.binding(String.class).buildDescriptor("dId")
, atb.binding(Integer.class).buildDescriptor("s")
, atb.binding(Integer.class).buildDescriptor("a")
, atb.binding(Integer.class).buildDescriptor("e")
);
@Override
public void populate(SimpleFeature f, DomainObject domainObject, String id, byte[] payload, Geometry geom, Date dtg) {
f.setAttribute("rId", domainObject.rideId);
f.setAttribute("dId", domainObject.deviceId);
f.setAttribute("s", domainObject.speed);
f.setAttribute("a", domainObject.angle);
f.setAttribute("e", domainObject.error);
}
@Override
public List<AttributeDescriptor> getExtraAttributes() {
return attributeDescriptors;
}
}
);
//Inserting
final DomainObject one = new DomainObject(1, "AJJASsP", 12, 40, 1);
final GeometryFactory gf = JTSFactoryFinder.getGeometryFactory();
System.out.println(index.insert(
one,
gf.createPoint(new Coordinate(-74.0, 34.0)),
date("2017-03-31T01:15:00.000Z")
));
//Read
GeoMesaQuery q = GeoMesaQuery.GeoMesaQueryBuilder.builder()
.within(-90.0, -180, 90, 180)
.during(date("2017-01-01T00:00:00.000Z"), date("2017-04-01T00:00:00.000Z"))
.build();
Iterable<DomainObject> results = index.query(q);
int counter = 0;
for(DomainObject dm : results){
counter += 1;
System.out.println("result counter: " + counter);
dovs.toBytes(dm);
}
}
catch (Exception ex){
ex.printStackTrace();
}
}
public static class DomainObject {
public final int rideId;
public final String deviceId;
public final int angle;
public final int speed;
public final int error;
public DomainObject(int rideId, String deviceId, int angle, int speed, int error) {
this.rideId = rideId;
this.deviceId = deviceId;
this.angle = angle;
this.speed = speed;
this.error = error;
}
}
public static class DomainObjectValueSerializer implements ValueSerializer<DomainObject> {
public static final Gson gson = new Gson();
@Override
public byte[] toBytes(DomainObject o) {
return gson.toJson(o).getBytes();
}
@Override
public DomainObject fromBytes(byte[] bytes) {
return gson.fromJson(new String(bytes), DomainObject.class);
}
}
public static Date date(String s) {
return Date.from(ZonedDateTime.parse(s).toInstant());
}
}
命令的日志是:
suresh@hpss-MacBook-Air:~/GeomesaAccumuloNativeClient $ java -cp target/geomesa-native-api_2.11-1.3.2-SNAPSHOT.jar org.locationtech.geomesa.api.WorkerBeta
WARNING: org.apache.hadoop.metrics.jvm.EventCounter is deprecated. Please use org.apache.hadoop.log.metrics.EventCounter in all the log4j.properties files.
17/04/01 15:11:48 INFO zookeeper.ZooKeeper: Client environment:zookeeper.version=3.4.6-1569965, built on 02/20/2014 09:09 GMT
17/04/01 15:11:48 INFO zookeeper.ZooKeeper: Client environment:host.name=192.168.1.103
17/04/01 15:11:48 INFO zookeeper.ZooKeeper: Client environment:java.version=1.8.0_121
17/04/01 15:11:48 INFO zookeeper.ZooKeeper: Client environment:java.vendor=Oracle Corporation
17/04/01 15:11:48 INFO zookeeper.ZooKeeper: Client environment:java.home=/Library/Java/JavaVirtualMachines/jdk1.8.0_121.jdk/Contents/Home/jre
17/04/01 15:11:48 INFO zookeeper.ZooKeeper: Client environment:java.class.path=target/geomesa-native-api_2.11-1.3.2-SNAPSHOT.jar
17/04/01 15:11:48 INFO zookeeper.ZooKeeper: Client environment:java.library.path=/Users/suresh/Library/Java/Extensions:/Library/Java/Extensions:/Network/Library/Java/Extensions:/System/Library/Java/Extensions:/usr/lib/java:.
17/04/01 15:11:48 INFO zookeeper.ZooKeeper: Client environment:java.io.tmpdir=/var/folders/yk/h858t8h57nz42t6t4nqmwhcc0000gp/T/
17/04/01 15:11:48 INFO zookeeper.ZooKeeper: Client environment:java.compiler=<NA>
17/04/01 15:11:48 INFO zookeeper.ZooKeeper: Client environment:os.name=Mac OS X
17/04/01 15:11:48 INFO zookeeper.ZooKeeper: Client environment:os.arch=x86_64
17/04/01 15:11:48 INFO zookeeper.ZooKeeper: Client environment:os.version=10.12.3
17/04/01 15:11:48 INFO zookeeper.ZooKeeper: Client environment:user.name=suresh
17/04/01 15:11:48 INFO zookeeper.ZooKeeper: Client environment:user.home=/Users/suresh
17/04/01 15:11:48 INFO zookeeper.ZooKeeper: Client environment:user.dir=/Users/suresh/GeomesaAccumuloNativeClient
17/04/01 15:11:48 INFO zookeeper.ZooKeeper: Initiating client connection, connectString=localhost:2181 sessionTimeout=30000 watcher=org.apache.accumulo.fate.zookeeper.ZooSession$ZooWatcher@73eb439a
17/04/01 15:11:48 INFO zookeeper.ClientCnxn: Opening socket connection to server localhost/0:0:0:0:0:0:0:1:2181. Will not attempt to authenticate using SASL (unknown error)
17/04/01 15:11:48 INFO zookeeper.ClientCnxn: Socket connection established to localhost/0:0:0:0:0:0:0:1:2181, initiating session
17/04/01 15:11:48 INFO zookeeper.ClientCnxn: Session establishment complete on server localhost/0:0:0:0:0:0:0:1:2181, sessionid = 0x15aea0c41f601a1, negotiated timeout = 30000
17/04/01 15:11:52 WARN data.AccumuloDataStore: Configured server-side iterators do not match client version - client version: 1.3.2-SNAPSHOT, server version: 1.3.0
50fa12fb-11f8-4776-bb35-95b32da9225d
[]
但是当我尝试验证插入的记录时,我无法在 created 中找到任何与 accumulo 网络界面表中插入的数据相关的特定条目。这是累积表
两条速记:
您的类型没有名为 'dtg' 的字段,GeoMesaQuery 假设有一个。要轻松解决此问题,您可以使用 'GeoMesaQuery.GeoMesaQueryBuilder.builder().include().build()'。从长远来看,原生 api 可以使用一些改进来轻松流畅地执行您想要的操作。
要查看记录是否已写入 Accumulo,您可以使用 Accumulo shell 并扫描各个表。如果表中没有任何内容,可能值得调试此 code.
可能您的插入没有刷新到磁盘。 Accumulo 使用批处理写入器来提高性能——一旦其内部缓冲区已满,这将定期写入磁盘。由于您只插入一条记录,因此不会发生这种情况。
要修复,您可以在 GeoMesaIndex
实例上调用 close
。这会将所有现有记录刷新到磁盘。然后您需要实例化一个新实例来执行查询。