Remove Edge request messages - send multiple data messages in single rpc message
This commit is contained in:
parent
90d945cb2c
commit
cfca80defe
@ -490,7 +490,25 @@ public class EdgeController extends BaseController {
|
|||||||
if (fromEdgeSyncResponse.isSuccess()) {
|
if (fromEdgeSyncResponse.isSuccess()) {
|
||||||
response.setResult(new ResponseEntity<>(HttpStatus.OK));
|
response.setResult(new ResponseEntity<>(HttpStatus.OK));
|
||||||
} else {
|
} else {
|
||||||
response.setErrorResult(new ThingsboardException("Edge is not connected", ThingsboardErrorCode.GENERAL));
|
response.setErrorResult(new ThingsboardException(fromEdgeSyncResponse.getError(), ThingsboardErrorCode.GENERAL));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@ApiOperation(value = "Is edge sync process is active (isEdgeSyncProcessActive)",
|
||||||
|
notes = "Returns 'true' if edge is currently in sync process, 'false' - otherwise." + TENANT_AUTHORITY_PARAGRAPH)
|
||||||
|
@PreAuthorize("hasAuthority('TENANT_ADMIN')")
|
||||||
|
@GetMapping(value = "/edge/sync/{edgeId}/active")
|
||||||
|
public Boolean isEdgeSyncProcessActive(
|
||||||
|
@Parameter(description = EDGE_ID_PARAM_DESCRIPTION, required = true)
|
||||||
|
@PathVariable("edgeId") String strEdgeId) throws ThingsboardException {
|
||||||
|
checkParameter("edgeId", strEdgeId);
|
||||||
|
if (isEdgesEnabled() && edgeRpcServiceOpt.isPresent()) {
|
||||||
|
EdgeId edgeId = new EdgeId(toUUID(strEdgeId));
|
||||||
|
edgeId = checkNotNull(edgeId);
|
||||||
|
Edge edge = checkEdgeId(edgeId, Operation.READ);
|
||||||
|
return edgeRpcServiceOpt.get().isEdgeSyncProcessActive(edge.getTenantId(), edge.getId());
|
||||||
|
} else {
|
||||||
|
throw new ThingsboardException("Edges support disabled", ThingsboardErrorCode.GENERAL);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -292,17 +292,31 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase i
|
|||||||
session.startSyncProcess(true);
|
session.startSyncProcess(true);
|
||||||
success = true;
|
success = true;
|
||||||
}
|
}
|
||||||
clusterService.pushEdgeSyncResponseToCore(new FromEdgeSyncResponse(requestId, tenantId, edgeId, success));
|
clusterService.pushEdgeSyncResponseToCore(new FromEdgeSyncResponse(requestId, tenantId, edgeId, success, null));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void processSyncRequest(ToEdgeSyncRequest request, Consumer<FromEdgeSyncResponse> responseConsumer) {
|
public void processSyncRequest(ToEdgeSyncRequest request, Consumer<FromEdgeSyncResponse> responseConsumer) {
|
||||||
log.trace("[{}][{}] Processing sync edge request [{}]", request.getTenantId(), request.getId(), request.getEdgeId());
|
|
||||||
UUID requestId = request.getId();
|
UUID requestId = request.getId();
|
||||||
localSyncEdgeRequests.put(requestId, responseConsumer);
|
EdgeGrpcSession session = sessions.get(request.getEdgeId());
|
||||||
clusterService.pushEdgeSyncRequestToCore(request);
|
if (!session.isSyncCompleted()) {
|
||||||
scheduleSyncRequestTimeout(request, requestId);
|
responseConsumer.accept(new FromEdgeSyncResponse(requestId, request.getTenantId(), request.getEdgeId(), false, "Sync process is active at the moment"));
|
||||||
|
} else {
|
||||||
|
log.trace("[{}][{}] Processing sync edge request [{}]", request.getTenantId(), request.getId(), request.getEdgeId());
|
||||||
|
localSyncEdgeRequests.put(requestId, responseConsumer);
|
||||||
|
clusterService.pushEdgeSyncRequestToCore(request);
|
||||||
|
scheduleSyncRequestTimeout(request, requestId);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Boolean isEdgeSyncProcessActive(TenantId tenantId, EdgeId edgeId) {
|
||||||
|
EdgeGrpcSession session = sessions.get(edgeId);
|
||||||
|
if (session == null) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
return !session.isSyncCompleted();
|
||||||
}
|
}
|
||||||
|
|
||||||
private void scheduleSyncRequestTimeout(ToEdgeSyncRequest request, UUID requestId) {
|
private void scheduleSyncRequestTimeout(ToEdgeSyncRequest request, UUID requestId) {
|
||||||
@ -312,7 +326,7 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase i
|
|||||||
Consumer<FromEdgeSyncResponse> consumer = localSyncEdgeRequests.remove(requestId);
|
Consumer<FromEdgeSyncResponse> consumer = localSyncEdgeRequests.remove(requestId);
|
||||||
if (consumer != null) {
|
if (consumer != null) {
|
||||||
log.trace("[{}] timeout for processing sync edge request.", requestId);
|
log.trace("[{}] timeout for processing sync edge request.", requestId);
|
||||||
consumer.accept(new FromEdgeSyncResponse(requestId, request.getTenantId(), request.getEdgeId(), false));
|
consumer.accept(new FromEdgeSyncResponse(requestId, request.getTenantId(), request.getEdgeId(), false, "Edge is not connected"));
|
||||||
}
|
}
|
||||||
}, 20, TimeUnit.SECONDS);
|
}, 20, TimeUnit.SECONDS);
|
||||||
}
|
}
|
||||||
|
|||||||
@ -247,6 +247,7 @@ public final class EdgeGrpcSession implements Closeable {
|
|||||||
}
|
}
|
||||||
}, ctx.getGrpcCallbackExecutorService());
|
}, ctx.getGrpcCallbackExecutorService());
|
||||||
} else {
|
} else {
|
||||||
|
log.info("[{}][{}] sync process completed", this.tenantId, edge.getId());
|
||||||
DownlinkMsg syncCompleteDownlinkMsg = DownlinkMsg.newBuilder()
|
DownlinkMsg syncCompleteDownlinkMsg = DownlinkMsg.newBuilder()
|
||||||
.setDownlinkMsgId(EdgeUtils.nextPositiveInt())
|
.setDownlinkMsgId(EdgeUtils.nextPositiveInt())
|
||||||
.setSyncCompletedMsg(SyncCompletedMsg.newBuilder().build())
|
.setSyncCompletedMsg(SyncCompletedMsg.newBuilder().build())
|
||||||
@ -325,7 +326,11 @@ public final class EdgeGrpcSession implements Closeable {
|
|||||||
}
|
}
|
||||||
|
|
||||||
private void sendDownlinkMsg(ResponseMsg downlinkMsg) {
|
private void sendDownlinkMsg(ResponseMsg downlinkMsg) {
|
||||||
log.trace("[{}][{}] Sending downlink msg [{}]", this.tenantId, this.sessionId, downlinkMsg);
|
if (downlinkMsg.getDownlinkMsg().getWidgetTypeUpdateMsgCount() > 0) {
|
||||||
|
log.trace("[{}][{}] Sending downlink widgetTypeUpdateMsg, downlinkMsgId = {}", this.tenantId, this.sessionId, downlinkMsg.getDownlinkMsg().getDownlinkMsgId());
|
||||||
|
} else {
|
||||||
|
log.trace("[{}][{}] Sending downlink msg [{}]", this.tenantId, this.sessionId, downlinkMsg);
|
||||||
|
}
|
||||||
if (isConnected()) {
|
if (isConnected()) {
|
||||||
downlinkMsgLock.lock();
|
downlinkMsgLock.lock();
|
||||||
try {
|
try {
|
||||||
@ -337,7 +342,7 @@ public final class EdgeGrpcSession implements Closeable {
|
|||||||
} finally {
|
} finally {
|
||||||
downlinkMsgLock.unlock();
|
downlinkMsgLock.unlock();
|
||||||
}
|
}
|
||||||
log.trace("[{}][{}] Response msg successfully sent [{}]", this.tenantId, this.sessionId, downlinkMsg);
|
log.trace("[{}][{}] Response msg successfully sent. downlinkMsgId = {}", this.tenantId, this.sessionId, downlinkMsg.getDownlinkMsg().getDownlinkMsgId());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -551,9 +556,13 @@ public final class EdgeGrpcSession implements Closeable {
|
|||||||
DownlinkMsg downlinkMsg = null;
|
DownlinkMsg downlinkMsg = null;
|
||||||
try {
|
try {
|
||||||
switch (edgeEvent.getAction()) {
|
switch (edgeEvent.getAction()) {
|
||||||
case UPDATED, ADDED, DELETED, ASSIGNED_TO_EDGE, UNASSIGNED_FROM_EDGE, ALARM_ACK, ALARM_CLEAR, ALARM_DELETE, CREDENTIALS_UPDATED, RELATION_ADD_OR_UPDATE, RELATION_DELETED, CREDENTIALS_REQUEST, RPC_CALL, ASSIGNED_TO_CUSTOMER, UNASSIGNED_FROM_CUSTOMER, ADDED_COMMENT, UPDATED_COMMENT, DELETED_COMMENT -> {
|
case UPDATED, ADDED, DELETED, ASSIGNED_TO_EDGE, UNASSIGNED_FROM_EDGE, ALARM_ACK, ALARM_CLEAR, ALARM_DELETE, CREDENTIALS_UPDATED, RELATION_ADD_OR_UPDATE, RELATION_DELETED, RPC_CALL, ASSIGNED_TO_CUSTOMER, UNASSIGNED_FROM_CUSTOMER, ADDED_COMMENT, UPDATED_COMMENT, DELETED_COMMENT -> {
|
||||||
downlinkMsg = convertEntityEventToDownlink(edgeEvent);
|
downlinkMsg = convertEntityEventToDownlink(edgeEvent);
|
||||||
log.trace("[{}][{}] entity message processed [{}]", this.tenantId, this.sessionId, downlinkMsg);
|
if (downlinkMsg != null && downlinkMsg.getWidgetTypeUpdateMsgCount() > 0) {
|
||||||
|
log.trace("[{}][{}] widgetTypeUpdateMsg message processed, downlinkMsgId = {}", this.tenantId, this.sessionId, downlinkMsg.getDownlinkMsgId());
|
||||||
|
} else {
|
||||||
|
log.trace("[{}][{}] entity message processed [{}]", this.tenantId, this.sessionId, downlinkMsg);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
case ATTRIBUTES_UPDATED, POST_ATTRIBUTES, ATTRIBUTES_DELETED, TIMESERIES_UPDATED ->
|
case ATTRIBUTES_UPDATED, POST_ATTRIBUTES, ATTRIBUTES_DELETED, TIMESERIES_UPDATED ->
|
||||||
downlinkMsg = ctx.getTelemetryProcessor().convertTelemetryEventToDownlink(edge, edgeEvent);
|
downlinkMsg = ctx.getTelemetryProcessor().convertTelemetryEventToDownlink(edge, edgeEvent);
|
||||||
|
|||||||
@ -33,4 +33,6 @@ public interface EdgeRpcService {
|
|||||||
void deleteEdge(TenantId tenantId, EdgeId edgeId);
|
void deleteEdge(TenantId tenantId, EdgeId edgeId);
|
||||||
|
|
||||||
void processSyncRequest(ToEdgeSyncRequest request, Consumer<FromEdgeSyncResponse> responseConsumer);
|
void processSyncRequest(ToEdgeSyncRequest request, Consumer<FromEdgeSyncResponse> responseConsumer);
|
||||||
|
|
||||||
|
Boolean isEdgeSyncProcessActive(TenantId tenantId, EdgeId edgeId);
|
||||||
}
|
}
|
||||||
|
|||||||
@ -15,6 +15,7 @@
|
|||||||
*/
|
*/
|
||||||
package org.thingsboard.server.service.edge.rpc;
|
package org.thingsboard.server.service.edge.rpc;
|
||||||
|
|
||||||
|
import lombok.Getter;
|
||||||
import org.thingsboard.server.common.data.Customer;
|
import org.thingsboard.server.common.data.Customer;
|
||||||
import org.thingsboard.server.common.data.edge.Edge;
|
import org.thingsboard.server.common.data.edge.Edge;
|
||||||
import org.thingsboard.server.common.data.id.EntityId;
|
import org.thingsboard.server.common.data.id.EntityId;
|
||||||
@ -53,6 +54,7 @@ public class EdgeSyncCursor {
|
|||||||
|
|
||||||
private final List<EdgeEventFetcher> fetchers = new LinkedList<>();
|
private final List<EdgeEventFetcher> fetchers = new LinkedList<>();
|
||||||
|
|
||||||
|
@Getter
|
||||||
private int currentIdx = 0;
|
private int currentIdx = 0;
|
||||||
|
|
||||||
public EdgeSyncCursor(EdgeContextComponent ctx, Edge edge, boolean fullSync) {
|
public EdgeSyncCursor(EdgeContextComponent ctx, Edge edge, boolean fullSync) {
|
||||||
@ -62,12 +64,12 @@ public class EdgeSyncCursor {
|
|||||||
fetchers.add(new RuleChainsEdgeEventFetcher(ctx.getRuleChainService()));
|
fetchers.add(new RuleChainsEdgeEventFetcher(ctx.getRuleChainService()));
|
||||||
fetchers.add(new AdminSettingsEdgeEventFetcher(ctx.getAdminSettingsService()));
|
fetchers.add(new AdminSettingsEdgeEventFetcher(ctx.getAdminSettingsService()));
|
||||||
fetchers.add(new TenantAdminUsersEdgeEventFetcher(ctx.getUserService()));
|
fetchers.add(new TenantAdminUsersEdgeEventFetcher(ctx.getUserService()));
|
||||||
Customer publicCustomer = ctx.getCustomerService().findOrCreatePublicCustomer(edge.getTenantId());
|
}
|
||||||
fetchers.add(new CustomerEdgeEventFetcher(publicCustomer.getId()));
|
Customer publicCustomer = ctx.getCustomerService().findOrCreatePublicCustomer(edge.getTenantId());
|
||||||
if (edge.getCustomerId() != null && !EntityId.NULL_UUID.equals(edge.getCustomerId().getId())) {
|
fetchers.add(new CustomerEdgeEventFetcher(publicCustomer.getId()));
|
||||||
fetchers.add(new CustomerEdgeEventFetcher(edge.getCustomerId()));
|
if (edge.getCustomerId() != null && !EntityId.NULL_UUID.equals(edge.getCustomerId().getId())) {
|
||||||
fetchers.add(new CustomerUsersEdgeEventFetcher(ctx.getUserService(), edge.getCustomerId()));
|
fetchers.add(new CustomerEdgeEventFetcher(edge.getCustomerId()));
|
||||||
}
|
fetchers.add(new CustomerUsersEdgeEventFetcher(ctx.getUserService(), edge.getCustomerId()));
|
||||||
}
|
}
|
||||||
fetchers.add(new DashboardsEdgeEventFetcher(ctx.getDashboardService()));
|
fetchers.add(new DashboardsEdgeEventFetcher(ctx.getDashboardService()));
|
||||||
fetchers.add(new DefaultProfilesEdgeEventFetcher(ctx.getDeviceProfileService(), ctx.getAssetProfileService()));
|
fetchers.add(new DefaultProfilesEdgeEventFetcher(ctx.getDeviceProfileService(), ctx.getAssetProfileService()));
|
||||||
@ -102,9 +104,4 @@ public class EdgeSyncCursor {
|
|||||||
currentIdx++;
|
currentIdx++;
|
||||||
return edgeEventFetcher;
|
return edgeEventFetcher;
|
||||||
}
|
}
|
||||||
|
|
||||||
public int getCurrentIdx() {
|
|
||||||
return currentIdx;
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
@ -373,7 +373,7 @@ public abstract class BaseEdgeProcessor {
|
|||||||
private boolean doSaveIfEdgeIsOffline(EdgeEventType type,
|
private boolean doSaveIfEdgeIsOffline(EdgeEventType type,
|
||||||
EdgeEventActionType action) {
|
EdgeEventActionType action) {
|
||||||
return switch (action) {
|
return switch (action) {
|
||||||
case TIMESERIES_UPDATED, ALARM_ACK, ALARM_CLEAR, ALARM_ASSIGNED, ALARM_UNASSIGNED, CREDENTIALS_REQUEST, ADDED_COMMENT, UPDATED_COMMENT ->
|
case TIMESERIES_UPDATED, ALARM_ACK, ALARM_CLEAR, ALARM_ASSIGNED, ALARM_UNASSIGNED, ADDED_COMMENT, UPDATED_COMMENT ->
|
||||||
true;
|
true;
|
||||||
default -> switch (type) {
|
default -> switch (type) {
|
||||||
case ALARM, ALARM_COMMENT, RULE_CHAIN, RULE_CHAIN_METADATA, USER, CUSTOMER, TENANT, TENANT_PROFILE, WIDGETS_BUNDLE, WIDGET_TYPE,
|
case ALARM, ALARM_COMMENT, RULE_CHAIN, RULE_CHAIN_METADATA, USER, CUSTOMER, TENANT, TENANT_PROFILE, WIDGETS_BUNDLE, WIDGET_TYPE,
|
||||||
|
|||||||
@ -43,7 +43,6 @@ import org.thingsboard.server.common.msg.TbMsgMetaData;
|
|||||||
import org.thingsboard.server.common.msg.rpc.FromDeviceRpcResponse;
|
import org.thingsboard.server.common.msg.rpc.FromDeviceRpcResponse;
|
||||||
import org.thingsboard.server.common.msg.rpc.FromDeviceRpcResponseActorMsg;
|
import org.thingsboard.server.common.msg.rpc.FromDeviceRpcResponseActorMsg;
|
||||||
import org.thingsboard.server.dao.exception.DataValidationException;
|
import org.thingsboard.server.dao.exception.DataValidationException;
|
||||||
import org.thingsboard.server.gen.edge.v1.DeviceCredentialsRequestMsg;
|
|
||||||
import org.thingsboard.server.gen.edge.v1.DeviceCredentialsUpdateMsg;
|
import org.thingsboard.server.gen.edge.v1.DeviceCredentialsUpdateMsg;
|
||||||
import org.thingsboard.server.gen.edge.v1.DeviceRpcCallMsg;
|
import org.thingsboard.server.gen.edge.v1.DeviceRpcCallMsg;
|
||||||
import org.thingsboard.server.gen.edge.v1.DeviceUpdateMsg;
|
import org.thingsboard.server.gen.edge.v1.DeviceUpdateMsg;
|
||||||
@ -70,7 +69,7 @@ public abstract class DeviceEdgeProcessor extends BaseDeviceProcessor implements
|
|||||||
case ENTITY_CREATED_RPC_MESSAGE:
|
case ENTITY_CREATED_RPC_MESSAGE:
|
||||||
case ENTITY_UPDATED_RPC_MESSAGE:
|
case ENTITY_UPDATED_RPC_MESSAGE:
|
||||||
saveOrUpdateDevice(tenantId, deviceId, deviceUpdateMsg, edge);
|
saveOrUpdateDevice(tenantId, deviceId, deviceUpdateMsg, edge);
|
||||||
return saveEdgeEvent(tenantId, edge.getId(), EdgeEventType.DEVICE, EdgeEventActionType.CREDENTIALS_REQUEST, deviceId, null);
|
return Futures.immediateFuture(null);
|
||||||
case ENTITY_DELETED_RPC_MESSAGE:
|
case ENTITY_DELETED_RPC_MESSAGE:
|
||||||
Device deviceToDelete = deviceService.findDeviceById(tenantId, deviceId);
|
Device deviceToDelete = deviceService.findDeviceById(tenantId, deviceId);
|
||||||
if (deviceToDelete != null) {
|
if (deviceToDelete != null) {
|
||||||
@ -232,6 +231,12 @@ public abstract class DeviceEdgeProcessor extends BaseDeviceProcessor implements
|
|||||||
DownlinkMsg.Builder builder = DownlinkMsg.newBuilder()
|
DownlinkMsg.Builder builder = DownlinkMsg.newBuilder()
|
||||||
.setDownlinkMsgId(EdgeUtils.nextPositiveInt())
|
.setDownlinkMsgId(EdgeUtils.nextPositiveInt())
|
||||||
.addDeviceUpdateMsg(deviceUpdateMsg);
|
.addDeviceUpdateMsg(deviceUpdateMsg);
|
||||||
|
|
||||||
|
DeviceCredentials deviceCredentials = deviceCredentialsService.findDeviceCredentialsByDeviceId(edgeEvent.getTenantId(), deviceId);
|
||||||
|
DeviceCredentialsUpdateMsg deviceCredentialsUpdateMsg = ((DeviceMsgConstructor)
|
||||||
|
deviceMsgConstructorFactory.getMsgConstructorByEdgeVersion(edgeVersion)).constructDeviceCredentialsUpdatedMsg(deviceCredentials);
|
||||||
|
builder.addDeviceCredentialsUpdateMsg(deviceCredentialsUpdateMsg).build();
|
||||||
|
|
||||||
if (UpdateMsgType.ENTITY_CREATED_RPC_MESSAGE.equals(msgType)) {
|
if (UpdateMsgType.ENTITY_CREATED_RPC_MESSAGE.equals(msgType)) {
|
||||||
DeviceProfile deviceProfile = deviceProfileService.findDeviceProfileById(edgeEvent.getTenantId(), device.getDeviceProfileId());
|
DeviceProfile deviceProfile = deviceProfileService.findDeviceProfileById(edgeEvent.getTenantId(), device.getDeviceProfileId());
|
||||||
deviceProfile = checkIfDeviceProfileDefaultFieldsAssignedToEdge(edgeEvent.getTenantId(), edgeId, deviceProfile, edgeVersion);
|
deviceProfile = checkIfDeviceProfileDefaultFieldsAssignedToEdge(edgeEvent.getTenantId(), edgeId, deviceProfile, edgeVersion);
|
||||||
@ -269,22 +274,7 @@ public abstract class DeviceEdgeProcessor extends BaseDeviceProcessor implements
|
|||||||
deviceMsgConstructorFactory.getMsgConstructorByEdgeVersion(edgeVersion))
|
deviceMsgConstructorFactory.getMsgConstructorByEdgeVersion(edgeVersion))
|
||||||
.constructDeviceRpcCallMsg(edgeEvent.getEntityId(), edgeEvent.getBody()))
|
.constructDeviceRpcCallMsg(edgeEvent.getEntityId(), edgeEvent.getBody()))
|
||||||
.build();
|
.build();
|
||||||
case CREDENTIALS_REQUEST:
|
|
||||||
return convertCredentialsRequestEventToDownlink(edgeEvent);
|
|
||||||
}
|
}
|
||||||
return downlinkMsg;
|
return downlinkMsg;
|
||||||
}
|
}
|
||||||
|
|
||||||
private DownlinkMsg convertCredentialsRequestEventToDownlink(EdgeEvent edgeEvent) {
|
|
||||||
DeviceId deviceId = new DeviceId(edgeEvent.getEntityId());
|
|
||||||
DeviceCredentialsRequestMsg deviceCredentialsRequestMsg = DeviceCredentialsRequestMsg.newBuilder()
|
|
||||||
.setDeviceIdMSB(deviceId.getId().getMostSignificantBits())
|
|
||||||
.setDeviceIdLSB(deviceId.getId().getLeastSignificantBits())
|
|
||||||
.build();
|
|
||||||
DownlinkMsg.Builder builder = DownlinkMsg.newBuilder()
|
|
||||||
.setDownlinkMsgId(EdgeUtils.nextPositiveInt())
|
|
||||||
.addDeviceCredentialsRequestMsg(deviceCredentialsRequestMsg);
|
|
||||||
return builder.build();
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
@ -61,10 +61,19 @@ public class RuleChainEdgeProcessor extends BaseEdgeProcessor {
|
|||||||
RuleChainUpdateMsg ruleChainUpdateMsg = ((RuleChainMsgConstructor)
|
RuleChainUpdateMsg ruleChainUpdateMsg = ((RuleChainMsgConstructor)
|
||||||
ruleChainMsgConstructorFactory.getMsgConstructorByEdgeVersion(edgeVersion))
|
ruleChainMsgConstructorFactory.getMsgConstructorByEdgeVersion(edgeVersion))
|
||||||
.constructRuleChainUpdatedMsg(msgType, ruleChain, isRoot);
|
.constructRuleChainUpdatedMsg(msgType, ruleChain, isRoot);
|
||||||
downlinkMsg = DownlinkMsg.newBuilder()
|
|
||||||
|
DownlinkMsg.Builder builder = DownlinkMsg.newBuilder()
|
||||||
.setDownlinkMsgId(EdgeUtils.nextPositiveInt())
|
.setDownlinkMsgId(EdgeUtils.nextPositiveInt())
|
||||||
.addRuleChainUpdateMsg(ruleChainUpdateMsg)
|
.addRuleChainUpdateMsg(ruleChainUpdateMsg);
|
||||||
.build();
|
|
||||||
|
RuleChainMetaData ruleChainMetaData = ruleChainService.loadRuleChainMetaData(edgeEvent.getTenantId(), ruleChainId);
|
||||||
|
RuleChainMetadataUpdateMsg ruleChainMetadataUpdateMsg = ((RuleChainMsgConstructor)
|
||||||
|
ruleChainMsgConstructorFactory.getMsgConstructorByEdgeVersion(edgeVersion))
|
||||||
|
.constructRuleChainMetadataUpdatedMsg(edgeEvent.getTenantId(), msgType, ruleChainMetaData, edgeVersion);
|
||||||
|
if (ruleChainMetadataUpdateMsg != null) {
|
||||||
|
builder.addRuleChainMetadataUpdateMsg(ruleChainMetadataUpdateMsg);
|
||||||
|
}
|
||||||
|
downlinkMsg = builder.build();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
case DELETED, UNASSIGNED_FROM_EDGE -> downlinkMsg = DownlinkMsg.newBuilder()
|
case DELETED, UNASSIGNED_FROM_EDGE -> downlinkMsg = DownlinkMsg.newBuilder()
|
||||||
|
|||||||
@ -43,10 +43,16 @@ public class UserEdgeProcessor extends BaseEdgeProcessor {
|
|||||||
User user = userService.findUserById(edgeEvent.getTenantId(), userId);
|
User user = userService.findUserById(edgeEvent.getTenantId(), userId);
|
||||||
if (user != null) {
|
if (user != null) {
|
||||||
UpdateMsgType msgType = getUpdateMsgType(edgeEvent.getAction());
|
UpdateMsgType msgType = getUpdateMsgType(edgeEvent.getAction());
|
||||||
downlinkMsg = DownlinkMsg.newBuilder()
|
DownlinkMsg.Builder builder = DownlinkMsg.newBuilder()
|
||||||
.setDownlinkMsgId(EdgeUtils.nextPositiveInt())
|
.setDownlinkMsgId(EdgeUtils.nextPositiveInt())
|
||||||
.addUserUpdateMsg(((UserMsgConstructor) userMsgConstructorFactory.getMsgConstructorByEdgeVersion(edgeVersion)).constructUserUpdatedMsg(msgType, user))
|
.addUserUpdateMsg(((UserMsgConstructor) userMsgConstructorFactory.getMsgConstructorByEdgeVersion(edgeVersion)).constructUserUpdatedMsg(msgType, user));
|
||||||
.build();
|
UserCredentials userCredentialsByUserId = userService.findUserCredentialsByUserId(edgeEvent.getTenantId(), userId);
|
||||||
|
if (userCredentialsByUserId != null && userCredentialsByUserId.isEnabled()) {
|
||||||
|
UserCredentialsUpdateMsg userCredentialsUpdateMsg =
|
||||||
|
((UserMsgConstructor) userMsgConstructorFactory.getMsgConstructorByEdgeVersion(edgeVersion)).constructUserCredentialsUpdatedMsg(userCredentialsByUserId);
|
||||||
|
builder.addUserCredentialsUpdateMsg(userCredentialsUpdateMsg);
|
||||||
|
}
|
||||||
|
downlinkMsg = builder.build();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
case DELETED -> downlinkMsg = DownlinkMsg.newBuilder()
|
case DELETED -> downlinkMsg = DownlinkMsg.newBuilder()
|
||||||
|
|||||||
@ -43,7 +43,7 @@ public enum EdgeEventActionType {
|
|||||||
DELETED_COMMENT(ActionType.DELETED_COMMENT),
|
DELETED_COMMENT(ActionType.DELETED_COMMENT),
|
||||||
ASSIGNED_TO_EDGE(ActionType.ASSIGNED_TO_EDGE),
|
ASSIGNED_TO_EDGE(ActionType.ASSIGNED_TO_EDGE),
|
||||||
UNASSIGNED_FROM_EDGE(ActionType.UNASSIGNED_FROM_EDGE),
|
UNASSIGNED_FROM_EDGE(ActionType.UNASSIGNED_FROM_EDGE),
|
||||||
CREDENTIALS_REQUEST(null),
|
CREDENTIALS_REQUEST(null), // deprecated
|
||||||
ENTITY_MERGE_REQUEST(null); // deprecated
|
ENTITY_MERGE_REQUEST(null); // deprecated
|
||||||
|
|
||||||
private final ActionType actionType;
|
private final ActionType actionType;
|
||||||
|
|||||||
@ -25,12 +25,13 @@ import java.util.UUID;
|
|||||||
@Data
|
@Data
|
||||||
public class FromEdgeSyncResponse implements EdgeSessionMsg {
|
public class FromEdgeSyncResponse implements EdgeSessionMsg {
|
||||||
|
|
||||||
private static final long serialVersionUID = -6360890886315347486L;
|
private static final long serialVersionUID = -6360890556315667486L;
|
||||||
|
|
||||||
private final UUID id;
|
private final UUID id;
|
||||||
private final TenantId tenantId;
|
private final TenantId tenantId;
|
||||||
private final EdgeId edgeId;
|
private final EdgeId edgeId;
|
||||||
private final boolean success;
|
private final boolean success;
|
||||||
|
private final String error;
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public MsgType getMsgType() {
|
public MsgType getMsgType() {
|
||||||
|
|||||||
@ -167,7 +167,8 @@ public class ProtoUtils {
|
|||||||
new UUID(proto.getResponseIdMSB(), proto.getResponseIdLSB()),
|
new UUID(proto.getResponseIdMSB(), proto.getResponseIdLSB()),
|
||||||
TenantId.fromUUID(new UUID(proto.getTenantIdMSB(), proto.getTenantIdLSB())),
|
TenantId.fromUUID(new UUID(proto.getTenantIdMSB(), proto.getTenantIdLSB())),
|
||||||
new EdgeId(new UUID(proto.getEdgeIdMSB(), proto.getEdgeIdLSB())),
|
new EdgeId(new UUID(proto.getEdgeIdMSB(), proto.getEdgeIdLSB())),
|
||||||
proto.getSuccess()
|
proto.getSuccess(),
|
||||||
|
proto.getError()
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -1148,6 +1148,7 @@ message FromEdgeSyncResponseMsgProto {
|
|||||||
int64 edgeIdMSB = 5;
|
int64 edgeIdMSB = 5;
|
||||||
int64 edgeIdLSB = 6;
|
int64 edgeIdLSB = 6;
|
||||||
bool success = 7;
|
bool success = 7;
|
||||||
|
string error = 8;
|
||||||
}
|
}
|
||||||
|
|
||||||
message DeviceEdgeUpdateMsgProto {
|
message DeviceEdgeUpdateMsgProto {
|
||||||
|
|||||||
@ -124,7 +124,7 @@ class ProtoUtilsTest {
|
|||||||
|
|
||||||
@Test
|
@Test
|
||||||
void protoFromEdgeSyncResponseSerialization() {
|
void protoFromEdgeSyncResponseSerialization() {
|
||||||
FromEdgeSyncResponse msg = new FromEdgeSyncResponse(id, tenantId, edgeId, true);
|
FromEdgeSyncResponse msg = new FromEdgeSyncResponse(id, tenantId, edgeId, true, null);
|
||||||
assertThat(ProtoUtils.fromProto(ProtoUtils.toProto(msg))).as("deserialized").isEqualTo(msg);
|
assertThat(ProtoUtils.fromProto(ProtoUtils.toProto(msg))).as("deserialized").isEqualTo(msg);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -2950,6 +2950,10 @@ public class RestClient implements Closeable {
|
|||||||
return restTemplate.getForEntity(baseURL + "/api/edges/enabled", Boolean.class).getBody();
|
return restTemplate.getForEntity(baseURL + "/api/edges/enabled", Boolean.class).getBody();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public Boolean isEdgeSyncProcessActive(EdgeId edgeId) {
|
||||||
|
return restTemplate.getForEntity(baseURL + "/api/edge/sync/" + edgeId.getId() + "/active", Boolean.class).getBody();
|
||||||
|
}
|
||||||
|
|
||||||
public Edge saveEdge(Edge edge) {
|
public Edge saveEdge(Edge edge) {
|
||||||
return restTemplate.postForEntity(baseURL + "/api/edge", edge, Edge.class).getBody();
|
return restTemplate.postForEntity(baseURL + "/api/edge", edge, Edge.class).getBody();
|
||||||
}
|
}
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user