执行 flush 方法,用于在不关闭 writer 的情况下将突变发送到 accumulo
Executing flush method, for sending mutations to accumulo without closing the writer
我正在使用 Geomesa Native Client 将数据本地写入 accumulo 存储。这是我的 java 代码
package org.locationtech.geomesa.api;
import com.google.common.base.Function;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.gson.Gson;
import com.vividsolutions.jts.geom.Coordinate;
import com.vividsolutions.jts.geom.Geometry;
import com.vividsolutions.jts.geom.GeometryFactory;
import org.apache.accumulo.core.client.Connector;
import org.apache.accumulo.core.client.mock.MockInstance;
import org.apache.accumulo.core.client.security.tokens.PasswordToken;
import org.apache.accumulo.core.security.Authorizations;
import org.geotools.factory.CommonFactoryFinder;
import org.geotools.feature.AttributeTypeBuilder;
import org.geotools.geometry.jts.JTSFactoryFinder;
import org.junit.Assert;
import org.junit.Test;
import org.locationtech.geomesa.accumulo.data.AccumuloDataStore;
import org.locationtech.geomesa.accumulo.index.AccumuloFeatureIndex;
import org.locationtech.geomesa.accumulo.index.AccumuloFeatureIndex$;
import org.locationtech.geomesa.utils.index.IndexMode$;
import org.opengis.feature.simple.SimpleFeature;
import org.opengis.feature.type.AttributeDescriptor;
import org.opengis.filter.FilterFactory2;
import javax.annotation.Nullable;
import java.time.ZonedDateTime;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.SortedSet;
import java.util.TreeSet;
public class WorkerBeta {
public static void main(String[] args){
try {
DomainObjectValueSerializer dovs = new DomainObjectValueSerializer();
final GeoMesaIndex<DomainObject> index = AccumuloGeoMesaIndex.buildWithView(
"aj_v14",
"localhost:2181",
"hps",
"root", "9869547580",
false,
dovs,
new SimpleFeatureView<DomainObject>() {
AttributeTypeBuilder atb = new AttributeTypeBuilder();
private List<AttributeDescriptor> attributeDescriptors =
Lists.newArrayList(atb.binding(Integer.class).buildDescriptor("rId")
, atb.binding(String.class).buildDescriptor("dId")
, atb.binding(Integer.class).buildDescriptor("s")
, atb.binding(Integer.class).buildDescriptor("a")
, atb.binding(Integer.class).buildDescriptor("e")
);
@Override
public void populate(SimpleFeature f, DomainObject domainObject, String id, byte[] payload, Geometry geom, Date dtg) {
f.setAttribute("rId", domainObject.rideId);
f.setAttribute("dId", domainObject.deviceId);
f.setAttribute("s", domainObject.speed);
f.setAttribute("a", domainObject.angle);
f.setAttribute("e", domainObject.error);
}
@Override
public List<AttributeDescriptor> getExtraAttributes() {
return attributeDescriptors;
}
}
);
//Inserting
final DomainObject one = new DomainObject(1, "AJJASsP", 12, 40, 1);
final GeometryFactory gf = JTSFactoryFinder.getGeometryFactory();
System.out.println(index.insert(
one,
gf.createPoint(new Coordinate(-74.0, 34.0)),
date("2017-03-31T01:15:00.000Z")
));
//Read
GeoMesaQuery q = GeoMesaQuery.GeoMesaQueryBuilder.builder()
.within(-90.0, -180, 90, 180)
.during(date("2017-01-01T00:00:00.000Z"), date("2017-04-01T00:00:00.000Z"))
.build();
Iterable<DomainObject> results = index.query(q);
int counter = 0;
for(DomainObject dm : results){
counter += 1;
System.out.println("result counter: " + counter);
dovs.toBytes(dm);
}
}
catch (Exception ex){
ex.printStackTrace();
}
index.close();
}
public static class DomainObject {
public final int rideId;
public final String deviceId;
public final int angle;
public final int speed;
public final int error;
public DomainObject(int rideId, String deviceId, int angle, int speed, int error) {
this.rideId = rideId;
this.deviceId = deviceId;
this.angle = angle;
this.speed = speed;
this.error = error;
}
}
public static class DomainObjectValueSerializer implements ValueSerializer<DomainObject> {
public static final Gson gson = new Gson();
@Override
public byte[] toBytes(DomainObject o) {
return gson.toJson(o).getBytes();
}
@Override
public DomainObject fromBytes(byte[] bytes) {
return gson.fromJson(new String(bytes), DomainObject.class);
}
}
public static Date date(String s) {
return Date.from(ZonedDateTime.parse(s).toInstant());
}
}
此代码的问题是,我需要每次为新的插入请求创建 index
对象并调用 index.close()
来反映相同的内容。但是一旦 index.close()
被调用,我就不能再执行 insert()
了。但是我将以非常高的速率接受来自队列的插入请求,我不想每次都创建 index
对象。我该怎么做?
简而言之,我如何在不调用 close()
.
的情况下刷新写入
我创建了 geomesa 客户端 class 文件以在本机使用 geomesa。下面是相同的部分实现,它显示了如何在不调用关闭的情况下使用 AccumuloAppendFeatureWriter
刷新。
public class GeomesaClient {
private AccumuloDataStore ds = null;
private AccumuloAppendFeatureWriter fw = null;
private SimpleFeatureSource sfs = null;
private String tableName = "";
private FeatureStore fst = null;
private SimpleFeatureType sft;
public GeomesaClient(Map<String, String> dsConf) throws Exception {
this.ds = (AccumuloDataStore) DataStoreFinder.getDataStore(dsConf);
this.tableName = dsConf.get("tableName");
sft = createFeatureType();
if(!Arrays.asList(this.ds.getTypeNames()).contains(sft.getTypeName())){
ds.createSchema(sft);
}
this.fst = (FeatureStore)sfs;
this.fw = (AccumuloAppendFeatureWriter) (this.ds.getFeatureWriterAppend(sft.getTypeName(),
Transaction.AUTO_COMMIT));
this.sfs = ds.getFeatureSource(sft.getTypeName());
}
/*
Flush with AccumuloAppendFeatureWriter
*/
public void flush(boolean force) {
fw.flush();
}
}
我正在使用 Geomesa Native Client 将数据本地写入 accumulo 存储。这是我的 java 代码
package org.locationtech.geomesa.api;
import com.google.common.base.Function;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.gson.Gson;
import com.vividsolutions.jts.geom.Coordinate;
import com.vividsolutions.jts.geom.Geometry;
import com.vividsolutions.jts.geom.GeometryFactory;
import org.apache.accumulo.core.client.Connector;
import org.apache.accumulo.core.client.mock.MockInstance;
import org.apache.accumulo.core.client.security.tokens.PasswordToken;
import org.apache.accumulo.core.security.Authorizations;
import org.geotools.factory.CommonFactoryFinder;
import org.geotools.feature.AttributeTypeBuilder;
import org.geotools.geometry.jts.JTSFactoryFinder;
import org.junit.Assert;
import org.junit.Test;
import org.locationtech.geomesa.accumulo.data.AccumuloDataStore;
import org.locationtech.geomesa.accumulo.index.AccumuloFeatureIndex;
import org.locationtech.geomesa.accumulo.index.AccumuloFeatureIndex$;
import org.locationtech.geomesa.utils.index.IndexMode$;
import org.opengis.feature.simple.SimpleFeature;
import org.opengis.feature.type.AttributeDescriptor;
import org.opengis.filter.FilterFactory2;
import javax.annotation.Nullable;
import java.time.ZonedDateTime;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.SortedSet;
import java.util.TreeSet;
public class WorkerBeta {
public static void main(String[] args){
try {
DomainObjectValueSerializer dovs = new DomainObjectValueSerializer();
final GeoMesaIndex<DomainObject> index = AccumuloGeoMesaIndex.buildWithView(
"aj_v14",
"localhost:2181",
"hps",
"root", "9869547580",
false,
dovs,
new SimpleFeatureView<DomainObject>() {
AttributeTypeBuilder atb = new AttributeTypeBuilder();
private List<AttributeDescriptor> attributeDescriptors =
Lists.newArrayList(atb.binding(Integer.class).buildDescriptor("rId")
, atb.binding(String.class).buildDescriptor("dId")
, atb.binding(Integer.class).buildDescriptor("s")
, atb.binding(Integer.class).buildDescriptor("a")
, atb.binding(Integer.class).buildDescriptor("e")
);
@Override
public void populate(SimpleFeature f, DomainObject domainObject, String id, byte[] payload, Geometry geom, Date dtg) {
f.setAttribute("rId", domainObject.rideId);
f.setAttribute("dId", domainObject.deviceId);
f.setAttribute("s", domainObject.speed);
f.setAttribute("a", domainObject.angle);
f.setAttribute("e", domainObject.error);
}
@Override
public List<AttributeDescriptor> getExtraAttributes() {
return attributeDescriptors;
}
}
);
//Inserting
final DomainObject one = new DomainObject(1, "AJJASsP", 12, 40, 1);
final GeometryFactory gf = JTSFactoryFinder.getGeometryFactory();
System.out.println(index.insert(
one,
gf.createPoint(new Coordinate(-74.0, 34.0)),
date("2017-03-31T01:15:00.000Z")
));
//Read
GeoMesaQuery q = GeoMesaQuery.GeoMesaQueryBuilder.builder()
.within(-90.0, -180, 90, 180)
.during(date("2017-01-01T00:00:00.000Z"), date("2017-04-01T00:00:00.000Z"))
.build();
Iterable<DomainObject> results = index.query(q);
int counter = 0;
for(DomainObject dm : results){
counter += 1;
System.out.println("result counter: " + counter);
dovs.toBytes(dm);
}
}
catch (Exception ex){
ex.printStackTrace();
}
index.close();
}
public static class DomainObject {
public final int rideId;
public final String deviceId;
public final int angle;
public final int speed;
public final int error;
public DomainObject(int rideId, String deviceId, int angle, int speed, int error) {
this.rideId = rideId;
this.deviceId = deviceId;
this.angle = angle;
this.speed = speed;
this.error = error;
}
}
public static class DomainObjectValueSerializer implements ValueSerializer<DomainObject> {
public static final Gson gson = new Gson();
@Override
public byte[] toBytes(DomainObject o) {
return gson.toJson(o).getBytes();
}
@Override
public DomainObject fromBytes(byte[] bytes) {
return gson.fromJson(new String(bytes), DomainObject.class);
}
}
public static Date date(String s) {
return Date.from(ZonedDateTime.parse(s).toInstant());
}
}
此代码的问题是,我需要每次为新的插入请求创建 index
对象并调用 index.close()
来反映相同的内容。但是一旦 index.close()
被调用,我就不能再执行 insert()
了。但是我将以非常高的速率接受来自队列的插入请求,我不想每次都创建 index
对象。我该怎么做?
简而言之,我如何在不调用 close()
.
我创建了 geomesa 客户端 class 文件以在本机使用 geomesa。下面是相同的部分实现,它显示了如何在不调用关闭的情况下使用 AccumuloAppendFeatureWriter
刷新。
public class GeomesaClient {
private AccumuloDataStore ds = null;
private AccumuloAppendFeatureWriter fw = null;
private SimpleFeatureSource sfs = null;
private String tableName = "";
private FeatureStore fst = null;
private SimpleFeatureType sft;
public GeomesaClient(Map<String, String> dsConf) throws Exception {
this.ds = (AccumuloDataStore) DataStoreFinder.getDataStore(dsConf);
this.tableName = dsConf.get("tableName");
sft = createFeatureType();
if(!Arrays.asList(this.ds.getTypeNames()).contains(sft.getTypeName())){
ds.createSchema(sft);
}
this.fst = (FeatureStore)sfs;
this.fw = (AccumuloAppendFeatureWriter) (this.ds.getFeatureWriterAppend(sft.getTypeName(),
Transaction.AUTO_COMMIT));
this.sfs = ds.getFeatureSource(sft.getTypeName());
}
/*
Flush with AccumuloAppendFeatureWriter
*/
public void flush(boolean force) {
fw.flush();
}
}