Added edge state. Added edge id and cloud type to proto

This commit is contained in:
Volodymyr Babak 2020-07-07 17:59:12 +03:00
parent d43edb83fd
commit 6c3d2d986e
5 changed files with 74 additions and 10 deletions

View File

@ -17,6 +17,7 @@ package org.thingsboard.server.service.edge.rpc;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.io.Resources;
import com.google.common.util.concurrent.FutureCallback;
import io.grpc.Server;
import io.grpc.ServerBuilder;
import io.grpc.stub.StreamObserver;
@ -25,16 +26,24 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Service;
import org.thingsboard.server.common.data.DataConstants;
import org.thingsboard.server.common.data.id.EdgeId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.kv.BasicTsKvEntry;
import org.thingsboard.server.common.data.kv.BooleanDataEntry;
import org.thingsboard.server.common.data.kv.LongDataEntry;
import org.thingsboard.server.gen.edge.EdgeRpcServiceGrpc;
import org.thingsboard.server.gen.edge.RequestMsg;
import org.thingsboard.server.gen.edge.ResponseMsg;
import org.thingsboard.server.service.edge.EdgeContextComponent;
import org.thingsboard.server.service.state.DefaultDeviceStateService;
import org.thingsboard.server.service.telemetry.TelemetrySubscriptionService;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.io.File;
import java.io.IOException;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
@ -56,10 +65,15 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase {
private String certFileResource;
@Value("${edges.rpc.ssl.private_key}")
private String privateKeyResource;
@Value("${edges.state.persistToTelemetry:false}")
private boolean persistToTelemetry;
@Autowired
private EdgeContextComponent ctx;
@Autowired
private TelemetrySubscriptionService tsSubService;
private Server server;
private ExecutorService executor;
@ -105,6 +119,8 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase {
private void onEdgeConnect(EdgeId edgeId, EdgeGrpcSession edgeGrpcSession) {
sessions.put(edgeId, edgeGrpcSession);
save(edgeId, DefaultDeviceStateService.ACTIVITY_STATE, true);
save(edgeId, DefaultDeviceStateService.LAST_CONNECT_TIME, System.currentTimeMillis());
}
private void processHandleMessages() {
@ -123,6 +139,51 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase {
private void onEdgeDisconnect(EdgeId edgeId) {
sessions.remove(edgeId);
save(edgeId, DefaultDeviceStateService.ACTIVITY_STATE, false);
save(edgeId, DefaultDeviceStateService.LAST_DISCONNECT_TIME, System.currentTimeMillis());
}
private void save(EdgeId edgeId, String key, long value) {
if (persistToTelemetry) {
tsSubService.saveAndNotify(
TenantId.SYS_TENANT_ID, edgeId,
Collections.singletonList(new BasicTsKvEntry(System.currentTimeMillis(), new LongDataEntry(key, value))),
new AttributeSaveCallback(edgeId, key, value));
} else {
tsSubService.saveAttrAndNotify(TenantId.SYS_TENANT_ID, edgeId, DataConstants.SERVER_SCOPE, key, value, new AttributeSaveCallback(edgeId, key, value));
}
}
private void save(EdgeId edgeId, String key, boolean value) {
if (persistToTelemetry) {
tsSubService.saveAndNotify(
TenantId.SYS_TENANT_ID, edgeId,
Collections.singletonList(new BasicTsKvEntry(System.currentTimeMillis(), new BooleanDataEntry(key, value))),
new AttributeSaveCallback(edgeId, key, value));
} else {
tsSubService.saveAttrAndNotify(TenantId.SYS_TENANT_ID, edgeId, DataConstants.SERVER_SCOPE, key, value, new AttributeSaveCallback(edgeId, key, value));
}
}
private static class AttributeSaveCallback implements FutureCallback<Void> {
private final EdgeId edgeId;
private final String key;
private final Object value;
AttributeSaveCallback(EdgeId edgeId, String key, Object value) {
this.edgeId = edgeId;
this.key = key;
this.value = value;
}
@Override
public void onSuccess(@javax.annotation.Nullable Void result) {
log.trace("[{}] Successfully updated attribute [{}] with value [{}]", edgeId, key, value);
}
@Override
public void onFailure(Throwable t) {
log.warn("[{}] Failed to update attribute [{}] with value [{}]", edgeId, key, value, t);
}
}
}

View File

@ -1042,11 +1042,14 @@ public final class EdgeGrpcSession implements Closeable {
private EdgeConfiguration constructEdgeConfigProto(Edge edge) throws JsonProcessingException {
return EdgeConfiguration.newBuilder()
.setEdgeIdMSB(edge.getId().getId().getMostSignificantBits())
.setEdgeIdLSB(edge.getId().getId().getLeastSignificantBits())
.setTenantIdMSB(edge.getTenantId().getId().getMostSignificantBits())
.setTenantIdLSB(edge.getTenantId().getId().getLeastSignificantBits())
.setName(edge.getName())
.setRoutingKey(edge.getRoutingKey())
.setType(edge.getType())
.setCloudType("CE")
.build();
}

View File

@ -591,6 +591,8 @@ edges:
max_read_records_count: "${EDGES_RPC_STORAGE_MAX_READ_RECORDS_COUNT:50}"
no_read_records_sleep: "${EDGES_RPC_NO_READ_RECORDS_SLEEP:1000}"
sleep_between_batches: "${EDGES_RPC_SLEEP_BETWEEN_BATCHES:1000}"
state:
persistToTelemetry: "${EDGES_PERSIST_STATE_TO_TELEMETRY:false}"
swagger:
api_path_regex: "${SWAGGER_API_PATH_REGEX:/api.*}"

View File

@ -84,12 +84,14 @@ message ConnectResponseMsg {
}
message EdgeConfiguration {
int64 tenantIdMSB = 1;
int64 tenantIdLSB = 2;
string name = 3;
string routingKey = 4;
string type = 5;
string cloudType = 6;
int64 edgeIdMSB = 1;
int64 edgeIdLSB = 2;
int64 tenantIdMSB = 3;
int64 tenantIdLSB = 4;
string name = 5;
string routingKey = 6;
string type = 7;
string cloudType = 8;
}
enum UpdateMsgType {

View File

@ -37,7 +37,6 @@ import org.thingsboard.server.common.data.page.TextPageLink;
import org.thingsboard.server.common.data.security.Authority;
import org.thingsboard.server.common.data.security.UserCredentials;
import org.thingsboard.server.dao.customer.CustomerDao;
import org.thingsboard.server.dao.edge.EdgeService;
import org.thingsboard.server.dao.entity.AbstractEntityService;
import org.thingsboard.server.dao.exception.DataValidationException;
import org.thingsboard.server.dao.exception.IncorrectParameterException;
@ -86,9 +85,6 @@ public class UserServiceImpl extends AbstractEntityService implements UserServic
@Autowired
private CustomerDao customerDao;
@Autowired
private EdgeService edgeService;
@Override
public User findUserByEmail(TenantId tenantId, String email) {
log.trace("Executing findUserByEmail [{}]", email);