使用 GeoMesa Native API 向 accumulo 中插入数据

Using GeoMesa Native API to insert data in accumulo

我正在尝试使用 GeoMesa Native API 从 accumulo 存储中插入和读取数据。我已经创建了一个 class 文件来本地使用 geomesa accumulo 存储。这是我的 java 代码:

package org.locationtech.geomesa.api;

import com.google.common.base.Function;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.gson.Gson;
import com.vividsolutions.jts.geom.Coordinate;
import com.vividsolutions.jts.geom.Geometry;
import com.vividsolutions.jts.geom.GeometryFactory;
import org.apache.accumulo.core.client.Connector;
import org.apache.accumulo.core.client.mock.MockInstance;
import org.apache.accumulo.core.client.security.tokens.PasswordToken;
import org.apache.accumulo.core.security.Authorizations;
import org.geotools.factory.CommonFactoryFinder;
import org.geotools.feature.AttributeTypeBuilder;
import org.geotools.geometry.jts.JTSFactoryFinder;
import org.junit.Assert;
import org.junit.Test;
import org.locationtech.geomesa.accumulo.data.AccumuloDataStore;
import org.locationtech.geomesa.accumulo.index.AccumuloFeatureIndex;
import org.locationtech.geomesa.accumulo.index.AccumuloFeatureIndex$;
import org.locationtech.geomesa.utils.index.IndexMode$;
import org.opengis.feature.simple.SimpleFeature;
import org.opengis.feature.type.AttributeDescriptor;
import org.opengis.filter.FilterFactory2;

import javax.annotation.Nullable;
import java.time.ZonedDateTime;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.SortedSet;
import java.util.TreeSet;

public class WorkerBeta {
    public static void main(String[] args){
        try {
            DomainObjectValueSerializer dovs = new DomainObjectValueSerializer();
            final GeoMesaIndex<DomainObject> index = AccumuloGeoMesaIndex.buildWithView(
            "aj_v14",
            "localhost:2181",
            "hps",
            "root", "9869547580",
            false,
            dovs,
            new SimpleFeatureView<DomainObject>() {
              AttributeTypeBuilder atb = new AttributeTypeBuilder();
              private List<AttributeDescriptor> attributeDescriptors =
                Lists.newArrayList(atb.binding(Integer.class).buildDescriptor("rId")
                    , atb.binding(String.class).buildDescriptor("dId")
                    , atb.binding(Integer.class).buildDescriptor("s")
                    , atb.binding(Integer.class).buildDescriptor("a")
                    , atb.binding(Integer.class).buildDescriptor("e")
                );
              @Override
              public void populate(SimpleFeature f, DomainObject domainObject, String id, byte[] payload, Geometry geom, Date dtg) {
                f.setAttribute("rId", domainObject.rideId);
                f.setAttribute("dId", domainObject.deviceId);
                f.setAttribute("s", domainObject.speed);
                f.setAttribute("a", domainObject.angle);
                f.setAttribute("e", domainObject.error);
              }

              @Override
              public List<AttributeDescriptor> getExtraAttributes() {
                return attributeDescriptors;
              }
            }
        );

        //Inserting 
        final DomainObject one = new DomainObject(1, "AJJASsP", 12, 40, 1);
        final GeometryFactory gf = JTSFactoryFinder.getGeometryFactory();
        System.out.println(index.insert(
                one,
                gf.createPoint(new Coordinate(-74.0, 34.0)),
                date("2017-03-31T01:15:00.000Z")
            ));

            //Read 
            GeoMesaQuery q = GeoMesaQuery.GeoMesaQueryBuilder.builder()
                .within(-90.0, -180, 90, 180)
                .during(date("2017-01-01T00:00:00.000Z"), date("2017-04-01T00:00:00.000Z"))
                .build();
            Iterable<DomainObject> results = index.query(q);
            int counter = 0;
            for(DomainObject dm : results){
                counter += 1;
                System.out.println("result counter: " + counter);
                dovs.toBytes(dm);
            }
        }
        catch (Exception ex){
      ex.printStackTrace();
        }
    }
    public static class DomainObject {
      public final int rideId;
      public final String deviceId;
      public final int angle;
      public final int speed;
      public final int error;

      public DomainObject(int rideId, String deviceId, int angle, int speed, int error) {
          this.rideId = rideId;
          this.deviceId = deviceId;
          this.angle = angle;
          this.speed = speed;
          this.error = error;
      }
    }
    public static class DomainObjectValueSerializer implements ValueSerializer<DomainObject> {
        public static final Gson gson = new Gson();
        @Override
        public byte[] toBytes(DomainObject o) {
            return gson.toJson(o).getBytes();
        }
        @Override
        public DomainObject fromBytes(byte[] bytes) {
            return gson.fromJson(new String(bytes), DomainObject.class);
        }
    }
    public static Date date(String s) {
        return Date.from(ZonedDateTime.parse(s).toInstant());
    }
}

命令的日志是:

suresh@hpss-MacBook-Air:~/GeomesaAccumuloNativeClient $ java -cp target/geomesa-native-api_2.11-1.3.2-SNAPSHOT.jar org.locationtech.geomesa.api.WorkerBeta
WARNING: org.apache.hadoop.metrics.jvm.EventCounter is deprecated. Please use org.apache.hadoop.log.metrics.EventCounter in all the log4j.properties files.
17/04/01 15:11:48 INFO zookeeper.ZooKeeper: Client environment:zookeeper.version=3.4.6-1569965, built on 02/20/2014 09:09 GMT
17/04/01 15:11:48 INFO zookeeper.ZooKeeper: Client environment:host.name=192.168.1.103
17/04/01 15:11:48 INFO zookeeper.ZooKeeper: Client environment:java.version=1.8.0_121
17/04/01 15:11:48 INFO zookeeper.ZooKeeper: Client environment:java.vendor=Oracle Corporation
17/04/01 15:11:48 INFO zookeeper.ZooKeeper: Client environment:java.home=/Library/Java/JavaVirtualMachines/jdk1.8.0_121.jdk/Contents/Home/jre
17/04/01 15:11:48 INFO zookeeper.ZooKeeper: Client environment:java.class.path=target/geomesa-native-api_2.11-1.3.2-SNAPSHOT.jar
17/04/01 15:11:48 INFO zookeeper.ZooKeeper: Client environment:java.library.path=/Users/suresh/Library/Java/Extensions:/Library/Java/Extensions:/Network/Library/Java/Extensions:/System/Library/Java/Extensions:/usr/lib/java:.
17/04/01 15:11:48 INFO zookeeper.ZooKeeper: Client environment:java.io.tmpdir=/var/folders/yk/h858t8h57nz42t6t4nqmwhcc0000gp/T/
17/04/01 15:11:48 INFO zookeeper.ZooKeeper: Client environment:java.compiler=<NA>
17/04/01 15:11:48 INFO zookeeper.ZooKeeper: Client environment:os.name=Mac OS X
17/04/01 15:11:48 INFO zookeeper.ZooKeeper: Client environment:os.arch=x86_64
17/04/01 15:11:48 INFO zookeeper.ZooKeeper: Client environment:os.version=10.12.3
17/04/01 15:11:48 INFO zookeeper.ZooKeeper: Client environment:user.name=suresh
17/04/01 15:11:48 INFO zookeeper.ZooKeeper: Client environment:user.home=/Users/suresh
17/04/01 15:11:48 INFO zookeeper.ZooKeeper: Client environment:user.dir=/Users/suresh/GeomesaAccumuloNativeClient
17/04/01 15:11:48 INFO zookeeper.ZooKeeper: Initiating client connection, connectString=localhost:2181 sessionTimeout=30000 watcher=org.apache.accumulo.fate.zookeeper.ZooSession$ZooWatcher@73eb439a
17/04/01 15:11:48 INFO zookeeper.ClientCnxn: Opening socket connection to server localhost/0:0:0:0:0:0:0:1:2181. Will not attempt to authenticate using SASL (unknown error)
17/04/01 15:11:48 INFO zookeeper.ClientCnxn: Socket connection established to localhost/0:0:0:0:0:0:0:1:2181, initiating session
17/04/01 15:11:48 INFO zookeeper.ClientCnxn: Session establishment complete on server localhost/0:0:0:0:0:0:0:1:2181, sessionid = 0x15aea0c41f601a1, negotiated timeout = 30000
17/04/01 15:11:52 WARN data.AccumuloDataStore: Configured server-side iterators do not match client version - client version: 1.3.2-SNAPSHOT, server version: 1.3.0
50fa12fb-11f8-4776-bb35-95b32da9225d
[]

但是当我尝试验证插入的记录时,我无法在 created 中找到任何与 accumulo 网络界面表中插入的数据相关的特定条目。这是累积表 的屏幕截图。如果我遗漏了什么,请纠正我。提前致谢。

两条速记:

  1. 您的类型没有名为 'dtg' 的字段,GeoMesaQuery 假设有一个。要轻松解决此问题,您可以使用 'GeoMesaQuery.GeoMesaQueryBuilder.builder().include().build()'。从长远来看,原生 api 可以使用一些改进来轻松流畅地执行您想要的操作。

  2. 要查看记录是否已写入 Accumulo,您可以使用 Accumulo shell 并扫描各个表。如果表中没有任何内容,可能值得调试此 code.

可能您的插入没有刷新到磁盘。 Accumulo 使用批处理写入器来提高性能——一旦其内部缓冲区已满,这将定期写入磁盘。由于您只插入一条记录,因此不会发生这种情况。 要修复,您可以在 GeoMesaIndex 实例上调用 close。这会将所有现有记录刷新到磁盘。然后您需要实例化一个新实例来执行查询。