Merge pull request #7982 from volodymyr-babak/edge/processors-refactoring
[3.5] Edge processors refactoring - move common part of edge / server to base classes
This commit is contained in:
commit
45e93cfc79
@ -41,22 +41,22 @@ import org.thingsboard.server.dao.edge.EdgeEventService;
|
||||
import org.thingsboard.server.dao.edge.EdgeService;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos;
|
||||
import org.thingsboard.server.queue.util.TbCoreComponent;
|
||||
import org.thingsboard.server.service.edge.rpc.processor.AlarmEdgeProcessor;
|
||||
import org.thingsboard.server.service.edge.rpc.processor.AssetEdgeProcessor;
|
||||
import org.thingsboard.server.service.edge.rpc.processor.AssetProfileEdgeProcessor;
|
||||
import org.thingsboard.server.service.edge.rpc.processor.CustomerEdgeProcessor;
|
||||
import org.thingsboard.server.service.edge.rpc.processor.DashboardEdgeProcessor;
|
||||
import org.thingsboard.server.service.edge.rpc.processor.DeviceEdgeProcessor;
|
||||
import org.thingsboard.server.service.edge.rpc.processor.DeviceProfileEdgeProcessor;
|
||||
import org.thingsboard.server.service.edge.rpc.processor.EdgeProcessor;
|
||||
import org.thingsboard.server.service.edge.rpc.processor.EntityViewEdgeProcessor;
|
||||
import org.thingsboard.server.service.edge.rpc.processor.OtaPackageEdgeProcessor;
|
||||
import org.thingsboard.server.service.edge.rpc.processor.QueueEdgeProcessor;
|
||||
import org.thingsboard.server.service.edge.rpc.processor.RelationEdgeProcessor;
|
||||
import org.thingsboard.server.service.edge.rpc.processor.RuleChainEdgeProcessor;
|
||||
import org.thingsboard.server.service.edge.rpc.processor.UserEdgeProcessor;
|
||||
import org.thingsboard.server.service.edge.rpc.processor.WidgetBundleEdgeProcessor;
|
||||
import org.thingsboard.server.service.edge.rpc.processor.WidgetTypeEdgeProcessor;
|
||||
import org.thingsboard.server.service.edge.rpc.processor.alarm.AlarmEdgeProcessor;
|
||||
import org.thingsboard.server.service.edge.rpc.processor.asset.AssetEdgeProcessor;
|
||||
import org.thingsboard.server.service.edge.rpc.processor.asset.AssetProfileEdgeProcessor;
|
||||
import org.thingsboard.server.service.edge.rpc.processor.customer.CustomerEdgeProcessor;
|
||||
import org.thingsboard.server.service.edge.rpc.processor.dashboard.DashboardEdgeProcessor;
|
||||
import org.thingsboard.server.service.edge.rpc.processor.device.DeviceEdgeProcessor;
|
||||
import org.thingsboard.server.service.edge.rpc.processor.device.DeviceProfileEdgeProcessor;
|
||||
import org.thingsboard.server.service.edge.rpc.processor.edge.EdgeProcessor;
|
||||
import org.thingsboard.server.service.edge.rpc.processor.entityview.EntityViewEdgeProcessor;
|
||||
import org.thingsboard.server.service.edge.rpc.processor.ota.OtaPackageEdgeProcessor;
|
||||
import org.thingsboard.server.service.edge.rpc.processor.queue.QueueEdgeProcessor;
|
||||
import org.thingsboard.server.service.edge.rpc.processor.relation.RelationEdgeProcessor;
|
||||
import org.thingsboard.server.service.edge.rpc.processor.rule.RuleChainEdgeProcessor;
|
||||
import org.thingsboard.server.service.edge.rpc.processor.user.UserEdgeProcessor;
|
||||
import org.thingsboard.server.service.edge.rpc.processor.widget.WidgetBundleEdgeProcessor;
|
||||
import org.thingsboard.server.service.edge.rpc.processor.widget.WidgetTypeEdgeProcessor;
|
||||
|
||||
import javax.annotation.PostConstruct;
|
||||
import javax.annotation.PreDestroy;
|
||||
|
||||
@ -40,24 +40,24 @@ import org.thingsboard.server.dao.widget.WidgetsBundleService;
|
||||
import org.thingsboard.server.queue.util.TbCoreComponent;
|
||||
import org.thingsboard.server.service.edge.rpc.EdgeEventStorageSettings;
|
||||
import org.thingsboard.server.service.edge.rpc.constructor.EdgeMsgConstructor;
|
||||
import org.thingsboard.server.service.edge.rpc.processor.AdminSettingsEdgeProcessor;
|
||||
import org.thingsboard.server.service.edge.rpc.processor.AlarmEdgeProcessor;
|
||||
import org.thingsboard.server.service.edge.rpc.processor.AssetEdgeProcessor;
|
||||
import org.thingsboard.server.service.edge.rpc.processor.AssetProfileEdgeProcessor;
|
||||
import org.thingsboard.server.service.edge.rpc.processor.CustomerEdgeProcessor;
|
||||
import org.thingsboard.server.service.edge.rpc.processor.DashboardEdgeProcessor;
|
||||
import org.thingsboard.server.service.edge.rpc.processor.DeviceEdgeProcessor;
|
||||
import org.thingsboard.server.service.edge.rpc.processor.DeviceProfileEdgeProcessor;
|
||||
import org.thingsboard.server.service.edge.rpc.processor.EdgeProcessor;
|
||||
import org.thingsboard.server.service.edge.rpc.processor.EntityViewEdgeProcessor;
|
||||
import org.thingsboard.server.service.edge.rpc.processor.OtaPackageEdgeProcessor;
|
||||
import org.thingsboard.server.service.edge.rpc.processor.QueueEdgeProcessor;
|
||||
import org.thingsboard.server.service.edge.rpc.processor.RelationEdgeProcessor;
|
||||
import org.thingsboard.server.service.edge.rpc.processor.RuleChainEdgeProcessor;
|
||||
import org.thingsboard.server.service.edge.rpc.processor.TelemetryEdgeProcessor;
|
||||
import org.thingsboard.server.service.edge.rpc.processor.UserEdgeProcessor;
|
||||
import org.thingsboard.server.service.edge.rpc.processor.WidgetBundleEdgeProcessor;
|
||||
import org.thingsboard.server.service.edge.rpc.processor.WidgetTypeEdgeProcessor;
|
||||
import org.thingsboard.server.service.edge.rpc.processor.settings.AdminSettingsEdgeProcessor;
|
||||
import org.thingsboard.server.service.edge.rpc.processor.alarm.AlarmEdgeProcessor;
|
||||
import org.thingsboard.server.service.edge.rpc.processor.asset.AssetEdgeProcessor;
|
||||
import org.thingsboard.server.service.edge.rpc.processor.asset.AssetProfileEdgeProcessor;
|
||||
import org.thingsboard.server.service.edge.rpc.processor.customer.CustomerEdgeProcessor;
|
||||
import org.thingsboard.server.service.edge.rpc.processor.dashboard.DashboardEdgeProcessor;
|
||||
import org.thingsboard.server.service.edge.rpc.processor.device.DeviceEdgeProcessor;
|
||||
import org.thingsboard.server.service.edge.rpc.processor.device.DeviceProfileEdgeProcessor;
|
||||
import org.thingsboard.server.service.edge.rpc.processor.edge.EdgeProcessor;
|
||||
import org.thingsboard.server.service.edge.rpc.processor.entityview.EntityViewEdgeProcessor;
|
||||
import org.thingsboard.server.service.edge.rpc.processor.ota.OtaPackageEdgeProcessor;
|
||||
import org.thingsboard.server.service.edge.rpc.processor.queue.QueueEdgeProcessor;
|
||||
import org.thingsboard.server.service.edge.rpc.processor.relation.RelationEdgeProcessor;
|
||||
import org.thingsboard.server.service.edge.rpc.processor.rule.RuleChainEdgeProcessor;
|
||||
import org.thingsboard.server.service.edge.rpc.processor.telemetry.TelemetryEdgeProcessor;
|
||||
import org.thingsboard.server.service.edge.rpc.processor.user.UserEdgeProcessor;
|
||||
import org.thingsboard.server.service.edge.rpc.processor.widget.WidgetBundleEdgeProcessor;
|
||||
import org.thingsboard.server.service.edge.rpc.processor.widget.WidgetTypeEdgeProcessor;
|
||||
import org.thingsboard.server.service.edge.rpc.sync.EdgeRequestsService;
|
||||
import org.thingsboard.server.service.executors.DbCallbackExecutorService;
|
||||
import org.thingsboard.server.service.executors.GrpcCallbackExecutorService;
|
||||
|
||||
@ -457,7 +457,6 @@ public final class EdgeGrpcSession implements Closeable {
|
||||
case ASSIGNED_TO_CUSTOMER:
|
||||
case UNASSIGNED_FROM_CUSTOMER:
|
||||
case CREDENTIALS_REQUEST:
|
||||
case ENTITY_MERGE_REQUEST:
|
||||
case RPC_CALL:
|
||||
downlinkMsg = convertEntityEventToDownlink(edgeEvent);
|
||||
log.trace("[{}][{}] entity message processed [{}]", edgeEvent.getTenantId(), this.sessionId, downlinkMsg);
|
||||
@ -556,27 +555,27 @@ public final class EdgeGrpcSession implements Closeable {
|
||||
try {
|
||||
if (uplinkMsg.getEntityDataCount() > 0) {
|
||||
for (EntityDataProto entityData : uplinkMsg.getEntityDataList()) {
|
||||
result.addAll(ctx.getTelemetryProcessor().processTelemetryFromEdge(edge.getTenantId(), entityData));
|
||||
result.addAll(ctx.getTelemetryProcessor().processTelemetryMsg(edge.getTenantId(), entityData));
|
||||
}
|
||||
}
|
||||
if (uplinkMsg.getDeviceUpdateMsgCount() > 0) {
|
||||
for (DeviceUpdateMsg deviceUpdateMsg : uplinkMsg.getDeviceUpdateMsgList()) {
|
||||
result.add(ctx.getDeviceProcessor().processDeviceFromEdge(edge.getTenantId(), edge, deviceUpdateMsg));
|
||||
result.add(ctx.getDeviceProcessor().processDeviceMsgFromEdge(edge.getTenantId(), edge, deviceUpdateMsg));
|
||||
}
|
||||
}
|
||||
if (uplinkMsg.getDeviceCredentialsUpdateMsgCount() > 0) {
|
||||
for (DeviceCredentialsUpdateMsg deviceCredentialsUpdateMsg : uplinkMsg.getDeviceCredentialsUpdateMsgList()) {
|
||||
result.add(ctx.getDeviceProcessor().processDeviceCredentialsFromEdge(edge.getTenantId(), deviceCredentialsUpdateMsg));
|
||||
result.add(ctx.getDeviceProcessor().processDeviceCredentialsMsg(edge.getTenantId(), deviceCredentialsUpdateMsg));
|
||||
}
|
||||
}
|
||||
if (uplinkMsg.getAlarmUpdateMsgCount() > 0) {
|
||||
for (AlarmUpdateMsg alarmUpdateMsg : uplinkMsg.getAlarmUpdateMsgList()) {
|
||||
result.add(ctx.getAlarmProcessor().processAlarmFromEdge(edge.getTenantId(), alarmUpdateMsg));
|
||||
result.add(ctx.getAlarmProcessor().processAlarmMsg(edge.getTenantId(), alarmUpdateMsg));
|
||||
}
|
||||
}
|
||||
if (uplinkMsg.getRelationUpdateMsgCount() > 0) {
|
||||
for (RelationUpdateMsg relationUpdateMsg : uplinkMsg.getRelationUpdateMsgList()) {
|
||||
result.add(ctx.getRelationProcessor().processRelationFromEdge(edge.getTenantId(), relationUpdateMsg));
|
||||
result.add(ctx.getRelationProcessor().processRelationMsg(edge.getTenantId(), relationUpdateMsg));
|
||||
}
|
||||
}
|
||||
if (uplinkMsg.getRuleChainMetadataRequestMsgCount() > 0) {
|
||||
|
||||
@ -41,7 +41,7 @@ public class DeviceMsgConstructor {
|
||||
@Autowired
|
||||
private DataDecodingEncodingService dataDecodingEncodingService;
|
||||
|
||||
public DeviceUpdateMsg constructDeviceUpdatedMsg(UpdateMsgType msgType, Device device, String conflictName) {
|
||||
public DeviceUpdateMsg constructDeviceUpdatedMsg(UpdateMsgType msgType, Device device) {
|
||||
DeviceUpdateMsg.Builder builder = DeviceUpdateMsg.newBuilder()
|
||||
.setMsgType(msgType)
|
||||
.setIdMSB(device.getId().getId().getMostSignificantBits())
|
||||
@ -70,9 +70,6 @@ public class DeviceMsgConstructor {
|
||||
builder.setSoftwareIdMSB(device.getSoftwareId().getId().getMostSignificantBits())
|
||||
.setSoftwareIdLSB(device.getSoftwareId().getId().getLeastSignificantBits());
|
||||
}
|
||||
if (conflictName != null) {
|
||||
builder.setConflictName(conflictName);
|
||||
}
|
||||
if (device.getDeviceData() != null) {
|
||||
builder.setDeviceDataBytes(ByteString.copyFrom(dataDecodingEncodingService.encode(device.getDeviceData())));
|
||||
}
|
||||
|
||||
@ -1,195 +0,0 @@
|
||||
/**
|
||||
* Copyright © 2016-2022 The Thingsboard Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.thingsboard.server.service.edge.rpc.processor;
|
||||
|
||||
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||
import com.google.common.util.concurrent.Futures;
|
||||
import com.google.common.util.concurrent.ListenableFuture;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.stereotype.Component;
|
||||
import org.thingsboard.common.util.JacksonUtil;
|
||||
import org.thingsboard.server.common.data.EdgeUtils;
|
||||
import org.thingsboard.server.common.data.EntityType;
|
||||
import org.thingsboard.server.common.data.alarm.Alarm;
|
||||
import org.thingsboard.server.common.data.alarm.AlarmSeverity;
|
||||
import org.thingsboard.server.common.data.alarm.AlarmStatus;
|
||||
import org.thingsboard.server.common.data.edge.EdgeEvent;
|
||||
import org.thingsboard.server.common.data.edge.EdgeEventActionType;
|
||||
import org.thingsboard.server.common.data.edge.EdgeEventType;
|
||||
import org.thingsboard.server.common.data.id.AlarmId;
|
||||
import org.thingsboard.server.common.data.id.EdgeId;
|
||||
import org.thingsboard.server.common.data.id.EntityId;
|
||||
import org.thingsboard.server.common.data.id.TenantId;
|
||||
import org.thingsboard.server.common.data.page.PageData;
|
||||
import org.thingsboard.server.common.data.page.PageLink;
|
||||
import org.thingsboard.server.gen.edge.v1.AlarmUpdateMsg;
|
||||
import org.thingsboard.server.gen.edge.v1.DownlinkMsg;
|
||||
import org.thingsboard.server.gen.edge.v1.UpdateMsgType;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos;
|
||||
import org.thingsboard.server.queue.util.TbCoreComponent;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.UUID;
|
||||
|
||||
@Component
|
||||
@Slf4j
|
||||
@TbCoreComponent
|
||||
public class AlarmEdgeProcessor extends BaseEdgeProcessor {
|
||||
|
||||
public ListenableFuture<Void> processAlarmFromEdge(TenantId tenantId, AlarmUpdateMsg alarmUpdateMsg) {
|
||||
log.trace("[{}] processAlarmFromEdge [{}]", tenantId, alarmUpdateMsg);
|
||||
EntityId originatorId = getAlarmOriginator(tenantId, alarmUpdateMsg.getOriginatorName(),
|
||||
EntityType.valueOf(alarmUpdateMsg.getOriginatorType()));
|
||||
if (originatorId == null) {
|
||||
log.warn("Originator not found for the alarm msg {}", alarmUpdateMsg);
|
||||
return Futures.immediateFuture(null);
|
||||
}
|
||||
try {
|
||||
Alarm existentAlarm = alarmService.findLatestByOriginatorAndType(tenantId, originatorId, alarmUpdateMsg.getType()).get();
|
||||
switch (alarmUpdateMsg.getMsgType()) {
|
||||
case ENTITY_CREATED_RPC_MESSAGE:
|
||||
case ENTITY_UPDATED_RPC_MESSAGE:
|
||||
if (existentAlarm == null || existentAlarm.getStatus().isCleared()) {
|
||||
existentAlarm = new Alarm();
|
||||
existentAlarm.setTenantId(tenantId);
|
||||
existentAlarm.setType(alarmUpdateMsg.getName());
|
||||
existentAlarm.setOriginator(originatorId);
|
||||
existentAlarm.setSeverity(AlarmSeverity.valueOf(alarmUpdateMsg.getSeverity()));
|
||||
existentAlarm.setStartTs(alarmUpdateMsg.getStartTs());
|
||||
existentAlarm.setClearTs(alarmUpdateMsg.getClearTs());
|
||||
existentAlarm.setPropagate(alarmUpdateMsg.getPropagate());
|
||||
}
|
||||
existentAlarm.setStatus(AlarmStatus.valueOf(alarmUpdateMsg.getStatus()));
|
||||
existentAlarm.setAckTs(alarmUpdateMsg.getAckTs());
|
||||
existentAlarm.setEndTs(alarmUpdateMsg.getEndTs());
|
||||
existentAlarm.setDetails(JacksonUtil.OBJECT_MAPPER.readTree(alarmUpdateMsg.getDetails()));
|
||||
alarmService.createOrUpdateAlarm(existentAlarm);
|
||||
break;
|
||||
case ALARM_ACK_RPC_MESSAGE:
|
||||
if (existentAlarm != null) {
|
||||
alarmService.ackAlarm(tenantId, existentAlarm.getId(), alarmUpdateMsg.getAckTs());
|
||||
}
|
||||
break;
|
||||
case ALARM_CLEAR_RPC_MESSAGE:
|
||||
if (existentAlarm != null) {
|
||||
alarmService.clearAlarm(tenantId, existentAlarm.getId(),
|
||||
JacksonUtil.OBJECT_MAPPER.readTree(alarmUpdateMsg.getDetails()), alarmUpdateMsg.getAckTs());
|
||||
}
|
||||
break;
|
||||
case ENTITY_DELETED_RPC_MESSAGE:
|
||||
if (existentAlarm != null) {
|
||||
alarmService.deleteAlarm(tenantId, existentAlarm.getId());
|
||||
}
|
||||
break;
|
||||
}
|
||||
return Futures.immediateFuture(null);
|
||||
} catch (Exception e) {
|
||||
log.error("Failed to process alarm update msg [{}]", alarmUpdateMsg, e);
|
||||
return Futures.immediateFailedFuture(new RuntimeException("Failed to process alarm update msg", e));
|
||||
}
|
||||
}
|
||||
|
||||
private EntityId getAlarmOriginator(TenantId tenantId, String entityName, EntityType entityType) {
|
||||
switch (entityType) {
|
||||
case DEVICE:
|
||||
return deviceService.findDeviceByTenantIdAndName(tenantId, entityName).getId();
|
||||
case ASSET:
|
||||
return assetService.findAssetByTenantIdAndName(tenantId, entityName).getId();
|
||||
case ENTITY_VIEW:
|
||||
return entityViewService.findEntityViewByTenantIdAndName(tenantId, entityName).getId();
|
||||
default:
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
public DownlinkMsg convertAlarmEventToDownlink(EdgeEvent edgeEvent) {
|
||||
AlarmId alarmId = new AlarmId(edgeEvent.getEntityId());
|
||||
DownlinkMsg downlinkMsg = null;
|
||||
UpdateMsgType msgType = getUpdateMsgType(edgeEvent.getAction());
|
||||
switch (edgeEvent.getAction()) {
|
||||
case ADDED:
|
||||
case UPDATED:
|
||||
case ALARM_ACK:
|
||||
case ALARM_CLEAR:
|
||||
try {
|
||||
Alarm alarm = alarmService.findAlarmByIdAsync(edgeEvent.getTenantId(), alarmId).get();
|
||||
if (alarm != null) {
|
||||
downlinkMsg = DownlinkMsg.newBuilder()
|
||||
.setDownlinkMsgId(EdgeUtils.nextPositiveInt())
|
||||
.addAlarmUpdateMsg(alarmMsgConstructor.constructAlarmUpdatedMsg(edgeEvent.getTenantId(), msgType, alarm))
|
||||
.build();
|
||||
}
|
||||
} catch (Exception e) {
|
||||
log.error("Can't process alarm msg [{}] [{}]", edgeEvent, msgType, e);
|
||||
}
|
||||
break;
|
||||
case DELETED:
|
||||
Alarm alarm = JacksonUtil.OBJECT_MAPPER.convertValue(edgeEvent.getBody(), Alarm.class);
|
||||
AlarmUpdateMsg alarmUpdateMsg =
|
||||
alarmMsgConstructor.constructAlarmUpdatedMsg(edgeEvent.getTenantId(), msgType, alarm);
|
||||
downlinkMsg = DownlinkMsg.newBuilder()
|
||||
.setDownlinkMsgId(EdgeUtils.nextPositiveInt())
|
||||
.addAlarmUpdateMsg(alarmUpdateMsg)
|
||||
.build();
|
||||
break;
|
||||
}
|
||||
return downlinkMsg;
|
||||
}
|
||||
|
||||
public ListenableFuture<Void> processAlarmNotification(TenantId tenantId, TransportProtos.EdgeNotificationMsgProto edgeNotificationMsg) throws JsonProcessingException {
|
||||
EdgeEventActionType actionType = EdgeEventActionType.valueOf(edgeNotificationMsg.getAction());
|
||||
AlarmId alarmId = new AlarmId(new UUID(edgeNotificationMsg.getEntityIdMSB(), edgeNotificationMsg.getEntityIdLSB()));
|
||||
switch (actionType) {
|
||||
case DELETED:
|
||||
EdgeId edgeId = new EdgeId(new UUID(edgeNotificationMsg.getEdgeIdMSB(), edgeNotificationMsg.getEdgeIdLSB()));
|
||||
Alarm deletedAlarm = JacksonUtil.OBJECT_MAPPER.readValue(edgeNotificationMsg.getBody(), Alarm.class);
|
||||
return saveEdgeEvent(tenantId, edgeId, EdgeEventType.ALARM, actionType, alarmId, JacksonUtil.OBJECT_MAPPER.valueToTree(deletedAlarm));
|
||||
default:
|
||||
ListenableFuture<Alarm> alarmFuture = alarmService.findAlarmByIdAsync(tenantId, alarmId);
|
||||
return Futures.transformAsync(alarmFuture, alarm -> {
|
||||
if (alarm == null) {
|
||||
return Futures.immediateFuture(null);
|
||||
}
|
||||
EdgeEventType type = EdgeUtils.getEdgeEventTypeByEntityType(alarm.getOriginator().getEntityType());
|
||||
if (type == null) {
|
||||
return Futures.immediateFuture(null);
|
||||
}
|
||||
PageLink pageLink = new PageLink(DEFAULT_PAGE_SIZE);
|
||||
PageData<EdgeId> pageData;
|
||||
List<ListenableFuture<Void>> futures = new ArrayList<>();
|
||||
do {
|
||||
pageData = edgeService.findRelatedEdgeIdsByEntityId(tenantId, alarm.getOriginator(), pageLink);
|
||||
if (pageData != null && pageData.getData() != null && !pageData.getData().isEmpty()) {
|
||||
for (EdgeId relatedEdgeId : pageData.getData()) {
|
||||
futures.add(saveEdgeEvent(tenantId,
|
||||
relatedEdgeId,
|
||||
EdgeEventType.ALARM,
|
||||
EdgeEventActionType.valueOf(edgeNotificationMsg.getAction()),
|
||||
alarmId,
|
||||
null));
|
||||
}
|
||||
if (pageData.hasNext()) {
|
||||
pageLink = pageLink.nextPageLink();
|
||||
}
|
||||
}
|
||||
} while (pageData != null && pageData.hasNext());
|
||||
return Futures.transform(Futures.allAsList(futures), voids -> null, dbCallbackExecutorService);
|
||||
}, dbCallbackExecutorService);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
@ -97,10 +97,14 @@ import org.thingsboard.server.service.telemetry.TelemetrySubscriptionService;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.locks.Lock;
|
||||
import java.util.concurrent.locks.ReentrantLock;
|
||||
|
||||
@Slf4j
|
||||
public abstract class BaseEdgeProcessor {
|
||||
|
||||
protected static final Lock deviceCreationLock = new ReentrantLock();
|
||||
|
||||
protected static final int DEFAULT_PAGE_SIZE = 100;
|
||||
|
||||
@Autowired
|
||||
@ -468,4 +472,17 @@ public abstract class BaseEdgeProcessor {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
protected UUID safeGetUUID(long mSB, long lSB) {
|
||||
return mSB != 0 && lSB != 0 ? new UUID(mSB, lSB) : null;
|
||||
}
|
||||
|
||||
protected CustomerId safeGetCustomerId(long mSB, long lSB) {
|
||||
CustomerId customerId = null;
|
||||
UUID customerUUID = safeGetUUID(mSB, lSB);
|
||||
if (customerUUID != null) {
|
||||
customerId = new CustomerId(customerUUID);
|
||||
}
|
||||
return customerId;
|
||||
}
|
||||
}
|
||||
|
||||
@ -0,0 +1,102 @@
|
||||
/**
|
||||
* Copyright © 2016-2022 The Thingsboard Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.thingsboard.server.service.edge.rpc.processor.alarm;
|
||||
|
||||
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||
import com.google.common.util.concurrent.Futures;
|
||||
import com.google.common.util.concurrent.ListenableFuture;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.stereotype.Component;
|
||||
import org.thingsboard.common.util.JacksonUtil;
|
||||
import org.thingsboard.server.common.data.EdgeUtils;
|
||||
import org.thingsboard.server.common.data.alarm.Alarm;
|
||||
import org.thingsboard.server.common.data.edge.EdgeEvent;
|
||||
import org.thingsboard.server.common.data.edge.EdgeEventActionType;
|
||||
import org.thingsboard.server.common.data.edge.EdgeEventType;
|
||||
import org.thingsboard.server.common.data.id.AlarmId;
|
||||
import org.thingsboard.server.common.data.id.EdgeId;
|
||||
import org.thingsboard.server.common.data.id.TenantId;
|
||||
import org.thingsboard.server.common.data.page.PageData;
|
||||
import org.thingsboard.server.common.data.page.PageLink;
|
||||
import org.thingsboard.server.gen.edge.v1.AlarmUpdateMsg;
|
||||
import org.thingsboard.server.gen.edge.v1.DownlinkMsg;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos;
|
||||
import org.thingsboard.server.queue.util.TbCoreComponent;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.UUID;
|
||||
|
||||
@Component
|
||||
@Slf4j
|
||||
@TbCoreComponent
|
||||
public class AlarmEdgeProcessor extends BaseAlarmProcessor {
|
||||
|
||||
public DownlinkMsg convertAlarmEventToDownlink(EdgeEvent edgeEvent) {
|
||||
AlarmUpdateMsg alarmUpdateMsg =
|
||||
convertAlarmEventToAlarmMsg(edgeEvent.getTenantId(), edgeEvent.getEntityId(), edgeEvent.getAction(), edgeEvent.getBody());
|
||||
if (alarmUpdateMsg != null) {
|
||||
return DownlinkMsg.newBuilder()
|
||||
.setDownlinkMsgId(EdgeUtils.nextPositiveInt())
|
||||
.addAlarmUpdateMsg(alarmUpdateMsg)
|
||||
.build();
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
public ListenableFuture<Void> processAlarmNotification(TenantId tenantId, TransportProtos.EdgeNotificationMsgProto edgeNotificationMsg) throws JsonProcessingException {
|
||||
EdgeEventActionType actionType = EdgeEventActionType.valueOf(edgeNotificationMsg.getAction());
|
||||
AlarmId alarmId = new AlarmId(new UUID(edgeNotificationMsg.getEntityIdMSB(), edgeNotificationMsg.getEntityIdLSB()));
|
||||
switch (actionType) {
|
||||
case DELETED:
|
||||
EdgeId edgeId = new EdgeId(new UUID(edgeNotificationMsg.getEdgeIdMSB(), edgeNotificationMsg.getEdgeIdLSB()));
|
||||
Alarm deletedAlarm = JacksonUtil.OBJECT_MAPPER.readValue(edgeNotificationMsg.getBody(), Alarm.class);
|
||||
return saveEdgeEvent(tenantId, edgeId, EdgeEventType.ALARM, actionType, alarmId, JacksonUtil.OBJECT_MAPPER.valueToTree(deletedAlarm));
|
||||
default:
|
||||
ListenableFuture<Alarm> alarmFuture = alarmService.findAlarmByIdAsync(tenantId, alarmId);
|
||||
return Futures.transformAsync(alarmFuture, alarm -> {
|
||||
if (alarm == null) {
|
||||
return Futures.immediateFuture(null);
|
||||
}
|
||||
EdgeEventType type = EdgeUtils.getEdgeEventTypeByEntityType(alarm.getOriginator().getEntityType());
|
||||
if (type == null) {
|
||||
return Futures.immediateFuture(null);
|
||||
}
|
||||
PageLink pageLink = new PageLink(DEFAULT_PAGE_SIZE);
|
||||
PageData<EdgeId> pageData;
|
||||
List<ListenableFuture<Void>> futures = new ArrayList<>();
|
||||
do {
|
||||
pageData = edgeService.findRelatedEdgeIdsByEntityId(tenantId, alarm.getOriginator(), pageLink);
|
||||
if (pageData != null && pageData.getData() != null && !pageData.getData().isEmpty()) {
|
||||
for (EdgeId relatedEdgeId : pageData.getData()) {
|
||||
futures.add(saveEdgeEvent(tenantId,
|
||||
relatedEdgeId,
|
||||
EdgeEventType.ALARM,
|
||||
EdgeEventActionType.valueOf(edgeNotificationMsg.getAction()),
|
||||
alarmId,
|
||||
null));
|
||||
}
|
||||
if (pageData.hasNext()) {
|
||||
pageLink = pageLink.nextPageLink();
|
||||
}
|
||||
}
|
||||
} while (pageData != null && pageData.hasNext());
|
||||
return Futures.transform(Futures.allAsList(futures), voids -> null, dbCallbackExecutorService);
|
||||
}, dbCallbackExecutorService);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
@ -0,0 +1,127 @@
|
||||
/**
|
||||
* Copyright © 2016-2022 The Thingsboard Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.thingsboard.server.service.edge.rpc.processor.alarm;
|
||||
|
||||
import com.fasterxml.jackson.databind.JsonNode;
|
||||
import com.google.common.util.concurrent.Futures;
|
||||
import com.google.common.util.concurrent.ListenableFuture;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.stereotype.Component;
|
||||
import org.thingsboard.common.util.JacksonUtil;
|
||||
import org.thingsboard.server.common.data.EntityType;
|
||||
import org.thingsboard.server.common.data.alarm.Alarm;
|
||||
import org.thingsboard.server.common.data.alarm.AlarmSeverity;
|
||||
import org.thingsboard.server.common.data.alarm.AlarmStatus;
|
||||
import org.thingsboard.server.common.data.edge.EdgeEventActionType;
|
||||
import org.thingsboard.server.common.data.id.AlarmId;
|
||||
import org.thingsboard.server.common.data.id.EntityId;
|
||||
import org.thingsboard.server.common.data.id.TenantId;
|
||||
import org.thingsboard.server.gen.edge.v1.AlarmUpdateMsg;
|
||||
import org.thingsboard.server.gen.edge.v1.UpdateMsgType;
|
||||
import org.thingsboard.server.queue.util.TbCoreComponent;
|
||||
import org.thingsboard.server.service.edge.rpc.processor.BaseEdgeProcessor;
|
||||
|
||||
import java.util.UUID;
|
||||
|
||||
@Slf4j
|
||||
public abstract class BaseAlarmProcessor extends BaseEdgeProcessor {
|
||||
|
||||
public ListenableFuture<Void> processAlarmMsg(TenantId tenantId, AlarmUpdateMsg alarmUpdateMsg) {
|
||||
log.trace("[{}] processAlarmMsg [{}]", tenantId, alarmUpdateMsg);
|
||||
EntityId originatorId = getAlarmOriginator(tenantId, alarmUpdateMsg.getOriginatorName(),
|
||||
EntityType.valueOf(alarmUpdateMsg.getOriginatorType()));
|
||||
if (originatorId == null) {
|
||||
log.warn("Originator not found for the alarm msg {}", alarmUpdateMsg);
|
||||
return Futures.immediateFuture(null);
|
||||
}
|
||||
try {
|
||||
Alarm existentAlarm = alarmService.findLatestByOriginatorAndType(tenantId, originatorId, alarmUpdateMsg.getType()).get();
|
||||
switch (alarmUpdateMsg.getMsgType()) {
|
||||
case ENTITY_CREATED_RPC_MESSAGE:
|
||||
case ENTITY_UPDATED_RPC_MESSAGE:
|
||||
if (existentAlarm == null || existentAlarm.getStatus().isCleared()) {
|
||||
existentAlarm = new Alarm();
|
||||
existentAlarm.setTenantId(tenantId);
|
||||
existentAlarm.setType(alarmUpdateMsg.getName());
|
||||
existentAlarm.setOriginator(originatorId);
|
||||
existentAlarm.setSeverity(AlarmSeverity.valueOf(alarmUpdateMsg.getSeverity()));
|
||||
existentAlarm.setStartTs(alarmUpdateMsg.getStartTs());
|
||||
existentAlarm.setClearTs(alarmUpdateMsg.getClearTs());
|
||||
existentAlarm.setPropagate(alarmUpdateMsg.getPropagate());
|
||||
}
|
||||
existentAlarm.setStatus(AlarmStatus.valueOf(alarmUpdateMsg.getStatus()));
|
||||
existentAlarm.setAckTs(alarmUpdateMsg.getAckTs());
|
||||
existentAlarm.setEndTs(alarmUpdateMsg.getEndTs());
|
||||
existentAlarm.setDetails(JacksonUtil.OBJECT_MAPPER.readTree(alarmUpdateMsg.getDetails()));
|
||||
alarmService.createOrUpdateAlarm(existentAlarm);
|
||||
break;
|
||||
case ALARM_ACK_RPC_MESSAGE:
|
||||
if (existentAlarm != null) {
|
||||
alarmService.ackAlarm(tenantId, existentAlarm.getId(), alarmUpdateMsg.getAckTs());
|
||||
}
|
||||
break;
|
||||
case ALARM_CLEAR_RPC_MESSAGE:
|
||||
if (existentAlarm != null) {
|
||||
alarmService.clearAlarm(tenantId, existentAlarm.getId(),
|
||||
JacksonUtil.OBJECT_MAPPER.readTree(alarmUpdateMsg.getDetails()), alarmUpdateMsg.getAckTs());
|
||||
}
|
||||
break;
|
||||
case ENTITY_DELETED_RPC_MESSAGE:
|
||||
if (existentAlarm != null) {
|
||||
alarmService.deleteAlarm(tenantId, existentAlarm.getId());
|
||||
}
|
||||
break;
|
||||
}
|
||||
return Futures.immediateFuture(null);
|
||||
} catch (Exception e) {
|
||||
log.error("[{}] Failed to process alarm update msg [{}]", tenantId, alarmUpdateMsg, e);
|
||||
return Futures.immediateFailedFuture(e);
|
||||
}
|
||||
}
|
||||
|
||||
private EntityId getAlarmOriginator(TenantId tenantId, String entityName, EntityType entityType) {
|
||||
switch (entityType) {
|
||||
case DEVICE:
|
||||
return deviceService.findDeviceByTenantIdAndName(tenantId, entityName).getId();
|
||||
case ASSET:
|
||||
return assetService.findAssetByTenantIdAndName(tenantId, entityName).getId();
|
||||
case ENTITY_VIEW:
|
||||
return entityViewService.findEntityViewByTenantIdAndName(tenantId, entityName).getId();
|
||||
default:
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
public AlarmUpdateMsg convertAlarmEventToAlarmMsg(TenantId tenantId, UUID entityId, EdgeEventActionType actionType, JsonNode body) {
|
||||
AlarmId alarmId = new AlarmId(entityId);
|
||||
UpdateMsgType msgType = getUpdateMsgType(actionType);
|
||||
switch (actionType) {
|
||||
case ADDED:
|
||||
case UPDATED:
|
||||
case ALARM_ACK:
|
||||
case ALARM_CLEAR:
|
||||
Alarm alarm = alarmService.findAlarmById(tenantId, alarmId);
|
||||
if (alarm != null) {
|
||||
return alarmMsgConstructor.constructAlarmUpdatedMsg(tenantId, msgType, alarm);
|
||||
}
|
||||
break;
|
||||
case DELETED:
|
||||
Alarm deletedAlarm = JacksonUtil.OBJECT_MAPPER.convertValue(body, Alarm.class);
|
||||
return alarmMsgConstructor.constructAlarmUpdatedMsg(tenantId, msgType, deletedAlarm);
|
||||
}
|
||||
return null;
|
||||
}
|
||||
}
|
||||
@ -13,7 +13,7 @@
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.thingsboard.server.service.edge.rpc.processor;
|
||||
package org.thingsboard.server.service.edge.rpc.processor.asset;
|
||||
|
||||
import com.google.common.util.concurrent.ListenableFuture;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
@ -30,6 +30,7 @@ import org.thingsboard.server.gen.edge.v1.DownlinkMsg;
|
||||
import org.thingsboard.server.gen.edge.v1.UpdateMsgType;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos;
|
||||
import org.thingsboard.server.queue.util.TbCoreComponent;
|
||||
import org.thingsboard.server.service.edge.rpc.processor.BaseEdgeProcessor;
|
||||
|
||||
@Component
|
||||
@Slf4j
|
||||
@ -13,7 +13,7 @@
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.thingsboard.server.service.edge.rpc.processor;
|
||||
package org.thingsboard.server.service.edge.rpc.processor.asset;
|
||||
|
||||
import com.google.common.util.concurrent.ListenableFuture;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
@ -28,6 +28,7 @@ import org.thingsboard.server.gen.edge.v1.DownlinkMsg;
|
||||
import org.thingsboard.server.gen.edge.v1.UpdateMsgType;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos;
|
||||
import org.thingsboard.server.queue.util.TbCoreComponent;
|
||||
import org.thingsboard.server.service.edge.rpc.processor.BaseEdgeProcessor;
|
||||
|
||||
@Component
|
||||
@Slf4j
|
||||
@ -13,7 +13,7 @@
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.thingsboard.server.service.edge.rpc.processor;
|
||||
package org.thingsboard.server.service.edge.rpc.processor.customer;
|
||||
|
||||
import com.google.common.util.concurrent.Futures;
|
||||
import com.google.common.util.concurrent.ListenableFuture;
|
||||
@ -36,6 +36,7 @@ import org.thingsboard.server.gen.edge.v1.DownlinkMsg;
|
||||
import org.thingsboard.server.gen.edge.v1.UpdateMsgType;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos;
|
||||
import org.thingsboard.server.queue.util.TbCoreComponent;
|
||||
import org.thingsboard.server.service.edge.rpc.processor.BaseEdgeProcessor;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
@ -13,7 +13,7 @@
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.thingsboard.server.service.edge.rpc.processor;
|
||||
package org.thingsboard.server.service.edge.rpc.processor.dashboard;
|
||||
|
||||
import com.google.common.util.concurrent.ListenableFuture;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
@ -28,6 +28,7 @@ import org.thingsboard.server.gen.edge.v1.DownlinkMsg;
|
||||
import org.thingsboard.server.gen.edge.v1.UpdateMsgType;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos;
|
||||
import org.thingsboard.server.queue.util.TbCoreComponent;
|
||||
import org.thingsboard.server.service.edge.rpc.processor.BaseEdgeProcessor;
|
||||
|
||||
@Component
|
||||
@Slf4j
|
||||
@ -0,0 +1,134 @@
|
||||
/**
|
||||
* Copyright © 2016-2022 The Thingsboard Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.thingsboard.server.service.edge.rpc.processor.device;
|
||||
|
||||
import com.datastax.oss.driver.api.core.uuid.Uuids;
|
||||
import com.google.common.util.concurrent.Futures;
|
||||
import com.google.common.util.concurrent.ListenableFuture;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.data.util.Pair;
|
||||
import org.thingsboard.common.util.JacksonUtil;
|
||||
import org.thingsboard.server.common.data.Device;
|
||||
import org.thingsboard.server.common.data.StringUtils;
|
||||
import org.thingsboard.server.common.data.device.data.DeviceData;
|
||||
import org.thingsboard.server.common.data.id.CustomerId;
|
||||
import org.thingsboard.server.common.data.id.DeviceId;
|
||||
import org.thingsboard.server.common.data.id.DeviceProfileId;
|
||||
import org.thingsboard.server.common.data.id.OtaPackageId;
|
||||
import org.thingsboard.server.common.data.id.TenantId;
|
||||
import org.thingsboard.server.common.data.security.DeviceCredentials;
|
||||
import org.thingsboard.server.common.data.security.DeviceCredentialsType;
|
||||
import org.thingsboard.server.gen.edge.v1.DeviceCredentialsUpdateMsg;
|
||||
import org.thingsboard.server.gen.edge.v1.DeviceUpdateMsg;
|
||||
import org.thingsboard.server.queue.util.DataDecodingEncodingService;
|
||||
import org.thingsboard.server.service.edge.rpc.processor.BaseEdgeProcessor;
|
||||
|
||||
import java.util.Optional;
|
||||
import java.util.UUID;
|
||||
|
||||
@Slf4j
|
||||
public abstract class BaseDeviceProcessor extends BaseEdgeProcessor {
|
||||
|
||||
@Autowired
|
||||
protected DataDecodingEncodingService dataDecodingEncodingService;
|
||||
|
||||
protected Pair<Boolean, Boolean> saveOrUpdateDevice(TenantId tenantId, DeviceId deviceId, DeviceUpdateMsg deviceUpdateMsg, CustomerId customerId) {
|
||||
boolean created = false;
|
||||
boolean deviceNameUpdated = false;
|
||||
deviceCreationLock.lock();
|
||||
try {
|
||||
Device device = deviceService.findDeviceById(tenantId, deviceId);
|
||||
String deviceName = deviceUpdateMsg.getName();
|
||||
if (device == null) {
|
||||
created = true;
|
||||
device = new Device();
|
||||
device.setTenantId(tenantId);
|
||||
device.setCreatedTime(Uuids.unixTimestamp(deviceId.getId()));
|
||||
Device deviceByName = deviceService.findDeviceByTenantIdAndName(tenantId, deviceName);
|
||||
if (deviceByName != null) {
|
||||
deviceName = deviceName + "_" + StringUtils.randomAlphabetic(15);
|
||||
log.warn("Device with name {} already exists. Renaming device name to {}",
|
||||
deviceUpdateMsg.getName(), deviceName);
|
||||
deviceNameUpdated = true;
|
||||
}
|
||||
}
|
||||
device.setName(deviceName);
|
||||
device.setType(deviceUpdateMsg.getType());
|
||||
device.setLabel(deviceUpdateMsg.hasLabel() ? deviceUpdateMsg.getLabel() : null);
|
||||
device.setAdditionalInfo(deviceUpdateMsg.hasAdditionalInfo()
|
||||
? JacksonUtil.toJsonNode(deviceUpdateMsg.getAdditionalInfo()) : null);
|
||||
|
||||
UUID deviceProfileUUID = safeGetUUID(deviceUpdateMsg.getDeviceProfileIdMSB(), deviceUpdateMsg.getDeviceProfileIdLSB());
|
||||
device.setDeviceProfileId(deviceProfileUUID != null ? new DeviceProfileId(deviceProfileUUID) : null);
|
||||
|
||||
device.setCustomerId(customerId);
|
||||
|
||||
Optional<DeviceData> deviceDataOpt =
|
||||
dataDecodingEncodingService.decode(deviceUpdateMsg.getDeviceDataBytes().toByteArray());
|
||||
device.setDeviceData(deviceDataOpt.orElse(null));
|
||||
|
||||
UUID firmwareUUID = safeGetUUID(deviceUpdateMsg.getFirmwareIdMSB(), deviceUpdateMsg.getFirmwareIdLSB());
|
||||
device.setFirmwareId(firmwareUUID != null ? new OtaPackageId(firmwareUUID) : null);
|
||||
|
||||
UUID softwareUUID = safeGetUUID(deviceUpdateMsg.getSoftwareIdMSB(), deviceUpdateMsg.getSoftwareIdLSB());
|
||||
device.setSoftwareId(softwareUUID != null ? new OtaPackageId(softwareUUID) : null);
|
||||
deviceValidator.validate(device, Device::getTenantId);
|
||||
if (created) {
|
||||
device.setId(deviceId);
|
||||
}
|
||||
Device savedDevice = deviceService.saveDevice(device, false);
|
||||
if (created) {
|
||||
DeviceCredentials deviceCredentials = new DeviceCredentials();
|
||||
deviceCredentials.setDeviceId(new DeviceId(savedDevice.getUuidId()));
|
||||
deviceCredentials.setCredentialsType(DeviceCredentialsType.ACCESS_TOKEN);
|
||||
deviceCredentials.setCredentialsId(StringUtils.randomAlphanumeric(20));
|
||||
deviceCredentialsService.createDeviceCredentials(device.getTenantId(), deviceCredentials);
|
||||
}
|
||||
tbClusterService.onDeviceUpdated(savedDevice, created ? null : device, false);
|
||||
} finally {
|
||||
deviceCreationLock.unlock();
|
||||
}
|
||||
return Pair.of(created, deviceNameUpdated);
|
||||
}
|
||||
|
||||
public ListenableFuture<Void> processDeviceCredentialsMsg(TenantId tenantId, DeviceCredentialsUpdateMsg deviceCredentialsUpdateMsg) {
|
||||
log.debug("[{}] Executing processDeviceCredentialsMsg, deviceCredentialsUpdateMsg [{}]", tenantId, deviceCredentialsUpdateMsg);
|
||||
DeviceId deviceId = new DeviceId(new UUID(deviceCredentialsUpdateMsg.getDeviceIdMSB(), deviceCredentialsUpdateMsg.getDeviceIdLSB()));
|
||||
ListenableFuture<Device> deviceFuture = deviceService.findDeviceByIdAsync(tenantId, deviceId);
|
||||
return Futures.transform(deviceFuture, device -> {
|
||||
if (device != null) {
|
||||
log.debug("Updating device credentials for device [{}]. New device credentials Id [{}], value [{}]",
|
||||
device.getName(), deviceCredentialsUpdateMsg.getCredentialsId(), deviceCredentialsUpdateMsg.getCredentialsValue());
|
||||
try {
|
||||
DeviceCredentials deviceCredentials = deviceCredentialsService.findDeviceCredentialsByDeviceId(tenantId, device.getId());
|
||||
deviceCredentials.setCredentialsType(DeviceCredentialsType.valueOf(deviceCredentialsUpdateMsg.getCredentialsType()));
|
||||
deviceCredentials.setCredentialsId(deviceCredentialsUpdateMsg.getCredentialsId());
|
||||
deviceCredentials.setCredentialsValue(deviceCredentialsUpdateMsg.hasCredentialsValue()
|
||||
? deviceCredentialsUpdateMsg.getCredentialsValue() : null);
|
||||
deviceCredentialsService.updateDeviceCredentials(tenantId, deviceCredentials);
|
||||
} catch (Exception e) {
|
||||
log.error("Can't update device credentials for device [{}], deviceCredentialsUpdateMsg [{}]",
|
||||
device.getName(), deviceCredentialsUpdateMsg, e);
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
} else {
|
||||
log.warn("Can't find device by id [{}], deviceCredentialsUpdateMsg [{}]", deviceId, deviceCredentialsUpdateMsg);
|
||||
}
|
||||
return null;
|
||||
}, dbCallbackExecutorService);
|
||||
}
|
||||
}
|
||||
@ -13,16 +13,15 @@
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.thingsboard.server.service.edge.rpc.processor;
|
||||
package org.thingsboard.server.service.edge.rpc.processor.device;
|
||||
|
||||
import com.datastax.oss.driver.api.core.uuid.Uuids;
|
||||
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||
import com.fasterxml.jackson.databind.node.ObjectNode;
|
||||
import com.google.common.util.concurrent.Futures;
|
||||
import com.google.common.util.concurrent.ListenableFuture;
|
||||
import com.google.common.util.concurrent.SettableFuture;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.data.util.Pair;
|
||||
import org.springframework.stereotype.Component;
|
||||
import org.thingsboard.common.util.JacksonUtil;
|
||||
import org.thingsboard.server.common.data.DataConstants;
|
||||
@ -30,24 +29,19 @@ import org.thingsboard.server.common.data.Device;
|
||||
import org.thingsboard.server.common.data.DeviceProfile;
|
||||
import org.thingsboard.server.common.data.EdgeUtils;
|
||||
import org.thingsboard.server.common.data.StringUtils;
|
||||
import org.thingsboard.server.common.data.device.data.DeviceData;
|
||||
import org.thingsboard.server.common.data.edge.Edge;
|
||||
import org.thingsboard.server.common.data.edge.EdgeEvent;
|
||||
import org.thingsboard.server.common.data.edge.EdgeEventActionType;
|
||||
import org.thingsboard.server.common.data.edge.EdgeEventType;
|
||||
import org.thingsboard.server.common.data.id.CustomerId;
|
||||
import org.thingsboard.server.common.data.id.DeviceId;
|
||||
import org.thingsboard.server.common.data.id.DeviceProfileId;
|
||||
import org.thingsboard.server.common.data.id.EdgeId;
|
||||
import org.thingsboard.server.common.data.id.EntityId;
|
||||
import org.thingsboard.server.common.data.id.TenantId;
|
||||
import org.thingsboard.server.common.data.page.PageData;
|
||||
import org.thingsboard.server.common.data.page.PageLink;
|
||||
import org.thingsboard.server.common.data.relation.EntityRelation;
|
||||
import org.thingsboard.server.common.data.relation.RelationTypeGroup;
|
||||
import org.thingsboard.server.common.data.rpc.RpcError;
|
||||
import org.thingsboard.server.common.data.security.DeviceCredentials;
|
||||
import org.thingsboard.server.common.data.security.DeviceCredentialsType;
|
||||
import org.thingsboard.server.common.msg.TbMsg;
|
||||
import org.thingsboard.server.common.msg.TbMsgDataType;
|
||||
import org.thingsboard.server.common.msg.TbMsgMetaData;
|
||||
@ -63,215 +57,57 @@ import org.thingsboard.server.gen.edge.v1.UpdateMsgType;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos;
|
||||
import org.thingsboard.server.queue.TbQueueCallback;
|
||||
import org.thingsboard.server.queue.TbQueueMsgMetadata;
|
||||
import org.thingsboard.server.queue.util.DataDecodingEncodingService;
|
||||
import org.thingsboard.server.queue.util.TbCoreComponent;
|
||||
import org.thingsboard.server.service.rpc.FromDeviceRpcResponseActorMsg;
|
||||
|
||||
import java.util.Optional;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.locks.ReentrantLock;
|
||||
|
||||
@Component
|
||||
@Slf4j
|
||||
@TbCoreComponent
|
||||
public class DeviceEdgeProcessor extends BaseEdgeProcessor {
|
||||
public class DeviceEdgeProcessor extends BaseDeviceProcessor {
|
||||
|
||||
@Autowired
|
||||
private DataDecodingEncodingService dataDecodingEncodingService;
|
||||
|
||||
private static final ReentrantLock deviceCreationLock = new ReentrantLock();
|
||||
|
||||
public ListenableFuture<Void> processDeviceFromEdge(TenantId tenantId, Edge edge, DeviceUpdateMsg deviceUpdateMsg) {
|
||||
log.trace("[{}] processDeviceFromEdge [{}] from edge [{}]", tenantId, deviceUpdateMsg, edge.getName());
|
||||
switch (deviceUpdateMsg.getMsgType()) {
|
||||
case ENTITY_CREATED_RPC_MESSAGE:
|
||||
String deviceName = deviceUpdateMsg.getName();
|
||||
Device device = deviceService.findDeviceByTenantIdAndName(tenantId, deviceName);
|
||||
if (device != null) {
|
||||
boolean deviceAlreadyExistsForThisEdge = isDeviceAlreadyExistsOnCloudForThisEdge(tenantId, edge, device);
|
||||
if (deviceAlreadyExistsForThisEdge) {
|
||||
log.info("[{}] Device with name '{}' already exists on the cloud, and related to this edge [{}]. " +
|
||||
"deviceUpdateMsg [{}], Updating device", tenantId, deviceName, edge.getId(), deviceUpdateMsg);
|
||||
return updateDevice(tenantId, edge, deviceUpdateMsg);
|
||||
} else {
|
||||
log.info("[{}] Device with name '{}' already exists on the cloud, but not related to this edge [{}]. deviceUpdateMsg [{}]." +
|
||||
"Creating a new device with random prefix and relate to this edge", tenantId, deviceName, edge.getId(), deviceUpdateMsg);
|
||||
String newDeviceName = deviceUpdateMsg.getName() + "_" + StringUtils.randomAlphabetic(15);
|
||||
Device newDevice;
|
||||
try {
|
||||
newDevice = createDevice(tenantId, edge, deviceUpdateMsg, newDeviceName);
|
||||
} catch (DataValidationException e) {
|
||||
log.error("[{}] Device update msg can't be processed due to data validation [{}]", tenantId, deviceUpdateMsg, e);
|
||||
return Futures.immediateFuture(null);
|
||||
}
|
||||
ObjectNode body = JacksonUtil.OBJECT_MAPPER.createObjectNode();
|
||||
body.put("conflictName", deviceName);
|
||||
ListenableFuture<Void> input = saveEdgeEvent(tenantId, edge.getId(), EdgeEventType.DEVICE, EdgeEventActionType.ENTITY_MERGE_REQUEST, newDevice.getId(), body);
|
||||
return Futures.transformAsync(input, unused ->
|
||||
saveEdgeEvent(tenantId, edge.getId(), EdgeEventType.DEVICE, EdgeEventActionType.CREDENTIALS_REQUEST, newDevice.getId(), null),
|
||||
dbCallbackExecutorService);
|
||||
}
|
||||
} else {
|
||||
log.info("[{}] Creating new device on the cloud [{}]", tenantId, deviceUpdateMsg);
|
||||
try {
|
||||
device = createDevice(tenantId, edge, deviceUpdateMsg, deviceUpdateMsg.getName());
|
||||
} catch (DataValidationException e) {
|
||||
log.error("[{}] Device update msg can't be processed due to data validation [{}]", tenantId, deviceUpdateMsg, e);
|
||||
return Futures.immediateFuture(null);
|
||||
}
|
||||
return saveEdgeEvent(tenantId, edge.getId(), EdgeEventType.DEVICE, EdgeEventActionType.CREDENTIALS_REQUEST, device.getId(), null);
|
||||
}
|
||||
case ENTITY_UPDATED_RPC_MESSAGE:
|
||||
return updateDevice(tenantId, edge, deviceUpdateMsg);
|
||||
case ENTITY_DELETED_RPC_MESSAGE:
|
||||
DeviceId deviceId = new DeviceId(new UUID(deviceUpdateMsg.getIdMSB(), deviceUpdateMsg.getIdLSB()));
|
||||
Device deviceToDelete = deviceService.findDeviceById(tenantId, deviceId);
|
||||
if (deviceToDelete != null) {
|
||||
deviceService.unassignDeviceFromEdge(tenantId, deviceId, edge.getId());
|
||||
}
|
||||
return Futures.immediateFuture(null);
|
||||
case UNRECOGNIZED:
|
||||
default:
|
||||
return handleUnsupportedMsgType(deviceUpdateMsg.getMsgType());
|
||||
}
|
||||
}
|
||||
|
||||
private boolean isDeviceAlreadyExistsOnCloudForThisEdge(TenantId tenantId, Edge edge, Device device) {
|
||||
PageLink pageLink = new PageLink(DEFAULT_PAGE_SIZE);
|
||||
PageData<EdgeId> pageData;
|
||||
do {
|
||||
pageData = edgeService.findRelatedEdgeIdsByEntityId(tenantId, device.getId(), pageLink);
|
||||
if (pageData != null && pageData.getData() != null && !pageData.getData().isEmpty()) {
|
||||
if (pageData.getData().contains(edge.getId())) {
|
||||
return true;
|
||||
}
|
||||
if (pageData.hasNext()) {
|
||||
pageLink = pageLink.nextPageLink();
|
||||
}
|
||||
}
|
||||
} while (pageData != null && pageData.hasNext());
|
||||
return false;
|
||||
}
|
||||
|
||||
public ListenableFuture<Void> processDeviceCredentialsFromEdge(TenantId tenantId, DeviceCredentialsUpdateMsg deviceCredentialsUpdateMsg) {
|
||||
log.debug("[{}] Executing processDeviceCredentialsFromEdge, deviceCredentialsUpdateMsg [{}]", tenantId, deviceCredentialsUpdateMsg);
|
||||
DeviceId deviceId = new DeviceId(new UUID(deviceCredentialsUpdateMsg.getDeviceIdMSB(), deviceCredentialsUpdateMsg.getDeviceIdLSB()));
|
||||
ListenableFuture<Device> deviceFuture = deviceService.findDeviceByIdAsync(tenantId, deviceId);
|
||||
return Futures.transform(deviceFuture, device -> {
|
||||
if (device != null) {
|
||||
log.debug("Updating device credentials for device [{}]. New device credentials Id [{}], value [{}]",
|
||||
device.getName(), deviceCredentialsUpdateMsg.getCredentialsId(), deviceCredentialsUpdateMsg.getCredentialsValue());
|
||||
try {
|
||||
DeviceCredentials deviceCredentials = deviceCredentialsService.findDeviceCredentialsByDeviceId(tenantId, device.getId());
|
||||
deviceCredentials.setCredentialsType(DeviceCredentialsType.valueOf(deviceCredentialsUpdateMsg.getCredentialsType()));
|
||||
deviceCredentials.setCredentialsId(deviceCredentialsUpdateMsg.getCredentialsId());
|
||||
if (deviceCredentialsUpdateMsg.hasCredentialsValue()) {
|
||||
deviceCredentials.setCredentialsValue(deviceCredentialsUpdateMsg.getCredentialsValue());
|
||||
}
|
||||
deviceCredentialsService.updateDeviceCredentials(tenantId, deviceCredentials);
|
||||
} catch (Exception e) {
|
||||
log.error("Can't update device credentials for device [{}], deviceCredentialsUpdateMsg [{}]", device.getName(), deviceCredentialsUpdateMsg, e);
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
return null;
|
||||
}, dbCallbackExecutorService);
|
||||
}
|
||||
|
||||
|
||||
private ListenableFuture<Void> updateDevice(TenantId tenantId, Edge edge, DeviceUpdateMsg deviceUpdateMsg) {
|
||||
public ListenableFuture<Void> processDeviceMsgFromEdge(TenantId tenantId, Edge edge, DeviceUpdateMsg deviceUpdateMsg) {
|
||||
log.trace("[{}] executing processDeviceMsgFromEdge [{}] from edge [{}]", tenantId, deviceUpdateMsg, edge.getName());
|
||||
DeviceId deviceId = new DeviceId(new UUID(deviceUpdateMsg.getIdMSB(), deviceUpdateMsg.getIdLSB()));
|
||||
Device device = deviceService.findDeviceById(tenantId, deviceId);
|
||||
if (device != null) {
|
||||
device.setName(deviceUpdateMsg.getName());
|
||||
device.setType(deviceUpdateMsg.getType());
|
||||
if (deviceUpdateMsg.hasLabel()) {
|
||||
device.setLabel(deviceUpdateMsg.getLabel());
|
||||
}
|
||||
if (deviceUpdateMsg.hasAdditionalInfo()) {
|
||||
device.setAdditionalInfo(JacksonUtil.toJsonNode(deviceUpdateMsg.getAdditionalInfo()));
|
||||
}
|
||||
if (deviceUpdateMsg.hasDeviceProfileIdMSB() && deviceUpdateMsg.hasDeviceProfileIdLSB()) {
|
||||
DeviceProfileId deviceProfileId = new DeviceProfileId(
|
||||
new UUID(deviceUpdateMsg.getDeviceProfileIdMSB(),
|
||||
deviceUpdateMsg.getDeviceProfileIdLSB()));
|
||||
device.setDeviceProfileId(deviceProfileId);
|
||||
}
|
||||
device.setCustomerId(getCustomerId(deviceUpdateMsg));
|
||||
Optional<DeviceData> deviceDataOpt =
|
||||
dataDecodingEncodingService.decode(deviceUpdateMsg.getDeviceDataBytes().toByteArray());
|
||||
deviceDataOpt.ifPresent(device::setDeviceData);
|
||||
Device savedDevice = deviceService.saveDevice(device);
|
||||
tbClusterService.onDeviceUpdated(savedDevice, device, false);
|
||||
return saveEdgeEvent(tenantId, edge.getId(), EdgeEventType.DEVICE, EdgeEventActionType.CREDENTIALS_REQUEST, deviceId, null);
|
||||
} else {
|
||||
String errMsg = String.format("[%s] can't find device [%s], edge [%s]", tenantId, deviceUpdateMsg, edge.getId());
|
||||
log.warn(errMsg);
|
||||
return Futures.immediateFailedFuture(new RuntimeException(errMsg));
|
||||
}
|
||||
}
|
||||
|
||||
private Device createDevice(TenantId tenantId, Edge edge, DeviceUpdateMsg deviceUpdateMsg, String deviceName) {
|
||||
Device device;
|
||||
deviceCreationLock.lock();
|
||||
try {
|
||||
log.debug("[{}] Creating device entity [{}] from edge [{}]", tenantId, deviceUpdateMsg, edge.getName());
|
||||
DeviceId deviceId = new DeviceId(new UUID(deviceUpdateMsg.getIdMSB(), deviceUpdateMsg.getIdLSB()));
|
||||
device = deviceService.findDeviceById(tenantId, deviceId);
|
||||
boolean created = false;
|
||||
if (device == null) {
|
||||
device = new Device();
|
||||
device.setTenantId(tenantId);
|
||||
device.setCreatedTime(Uuids.unixTimestamp(deviceId.getId()));
|
||||
created = true;
|
||||
switch (deviceUpdateMsg.getMsgType()) {
|
||||
case ENTITY_CREATED_RPC_MESSAGE:
|
||||
case ENTITY_UPDATED_RPC_MESSAGE:
|
||||
saveOrUpdateDevice(tenantId, deviceId, deviceUpdateMsg, edge);
|
||||
return saveEdgeEvent(tenantId, edge.getId(), EdgeEventType.DEVICE, EdgeEventActionType.CREDENTIALS_REQUEST, deviceId, null);
|
||||
case ENTITY_DELETED_RPC_MESSAGE:
|
||||
Device deviceToDelete = deviceService.findDeviceById(tenantId, deviceId);
|
||||
if (deviceToDelete != null) {
|
||||
deviceService.unassignDeviceFromEdge(tenantId, deviceId, edge.getId());
|
||||
}
|
||||
return Futures.immediateFuture(null);
|
||||
case UNRECOGNIZED:
|
||||
default:
|
||||
return handleUnsupportedMsgType(deviceUpdateMsg.getMsgType());
|
||||
}
|
||||
device.setName(deviceName);
|
||||
device.setType(deviceUpdateMsg.getType());
|
||||
if (deviceUpdateMsg.hasLabel()) {
|
||||
device.setLabel(deviceUpdateMsg.getLabel());
|
||||
} catch (DataValidationException e) {
|
||||
if (e.getMessage().contains("Can't create more then")) {
|
||||
log.warn("[{}] Number of allowed devices violated {}", tenantId, deviceUpdateMsg, e);
|
||||
return Futures.immediateFuture(null);
|
||||
} else {
|
||||
return Futures.immediateFailedFuture(e);
|
||||
}
|
||||
if (deviceUpdateMsg.hasAdditionalInfo()) {
|
||||
device.setAdditionalInfo(JacksonUtil.toJsonNode(deviceUpdateMsg.getAdditionalInfo()));
|
||||
}
|
||||
if (deviceUpdateMsg.hasDeviceProfileIdMSB() && deviceUpdateMsg.hasDeviceProfileIdLSB()) {
|
||||
DeviceProfileId deviceProfileId = new DeviceProfileId(
|
||||
new UUID(deviceUpdateMsg.getDeviceProfileIdMSB(),
|
||||
deviceUpdateMsg.getDeviceProfileIdLSB()));
|
||||
device.setDeviceProfileId(deviceProfileId);
|
||||
}
|
||||
device.setCustomerId(getCustomerId(deviceUpdateMsg));
|
||||
Optional<DeviceData> deviceDataOpt =
|
||||
dataDecodingEncodingService.decode(deviceUpdateMsg.getDeviceDataBytes().toByteArray());
|
||||
if (deviceDataOpt.isPresent()) {
|
||||
device.setDeviceData(deviceDataOpt.get());
|
||||
}
|
||||
deviceValidator.validate(device, Device::getTenantId);
|
||||
if (created) {
|
||||
device.setId(deviceId);
|
||||
}
|
||||
Device savedDevice = deviceService.saveDevice(device, false);
|
||||
tbClusterService.onDeviceUpdated(savedDevice, created ? null : device, false);
|
||||
if (created) {
|
||||
DeviceCredentials deviceCredentials = new DeviceCredentials();
|
||||
deviceCredentials.setDeviceId(new DeviceId(savedDevice.getUuidId()));
|
||||
deviceCredentials.setCredentialsType(DeviceCredentialsType.ACCESS_TOKEN);
|
||||
deviceCredentials.setCredentialsId(StringUtils.randomAlphanumeric(20));
|
||||
deviceCredentialsService.createDeviceCredentials(device.getTenantId(), deviceCredentials);
|
||||
}
|
||||
createRelationFromEdge(tenantId, edge.getId(), device.getId());
|
||||
pushDeviceCreatedEventToRuleEngine(tenantId, edge, device);
|
||||
deviceService.assignDeviceToEdge(edge.getTenantId(), device.getId(), edge.getId());
|
||||
} finally {
|
||||
deviceCreationLock.unlock();
|
||||
}
|
||||
return device;
|
||||
}
|
||||
|
||||
private CustomerId getCustomerId(DeviceUpdateMsg deviceUpdateMsg) {
|
||||
if (deviceUpdateMsg.hasCustomerIdMSB() && deviceUpdateMsg.hasCustomerIdLSB()) {
|
||||
return new CustomerId(new UUID(deviceUpdateMsg.getCustomerIdMSB(), deviceUpdateMsg.getCustomerIdLSB()));
|
||||
} else {
|
||||
return null;
|
||||
private void saveOrUpdateDevice(TenantId tenantId, DeviceId deviceId, DeviceUpdateMsg deviceUpdateMsg, Edge edge) {
|
||||
CustomerId customerId = safeGetCustomerId(deviceUpdateMsg.getCustomerIdMSB(), deviceUpdateMsg.getCustomerIdLSB());
|
||||
Pair<Boolean, Boolean> resultPair = super.saveOrUpdateDevice(tenantId, deviceId, deviceUpdateMsg, customerId);
|
||||
Boolean created = resultPair.getFirst();
|
||||
if (created) {
|
||||
createRelationFromEdge(tenantId, edge.getId(), deviceId);
|
||||
pushDeviceCreatedEventToRuleEngine(tenantId, edge, deviceId);
|
||||
deviceService.assignDeviceToEdge(tenantId, deviceId, edge.getId());
|
||||
}
|
||||
Boolean deviceNameUpdated = resultPair.getSecond();
|
||||
if (deviceNameUpdated) {
|
||||
saveEdgeEvent(tenantId, edge.getId(), EdgeEventType.DEVICE, EdgeEventActionType.UPDATED, deviceId, null);
|
||||
}
|
||||
}
|
||||
|
||||
@ -284,9 +120,9 @@ public class DeviceEdgeProcessor extends BaseEdgeProcessor {
|
||||
relationService.saveRelation(tenantId, relation);
|
||||
}
|
||||
|
||||
private void pushDeviceCreatedEventToRuleEngine(TenantId tenantId, Edge edge, Device device) {
|
||||
private void pushDeviceCreatedEventToRuleEngine(TenantId tenantId, Edge edge, DeviceId deviceId) {
|
||||
try {
|
||||
DeviceId deviceId = device.getId();
|
||||
Device device = deviceService.findDeviceById(tenantId, deviceId);
|
||||
ObjectNode entityNode = JacksonUtil.OBJECT_MAPPER.valueToTree(device);
|
||||
TbMsg tbMsg = TbMsg.newMsg(DataConstants.ENTITY_CREATED, deviceId, device.getCustomerId(),
|
||||
getActionTbMsgMetaData(edge, device.getCustomerId()), TbMsgDataType.JSON, JacksonUtil.OBJECT_MAPPER.writeValueAsString(entityNode));
|
||||
@ -302,7 +138,7 @@ public class DeviceEdgeProcessor extends BaseEdgeProcessor {
|
||||
}
|
||||
});
|
||||
} catch (JsonProcessingException | IllegalArgumentException e) {
|
||||
log.warn("[{}] Failed to push device action to rule engine: {}", device.getId(), DataConstants.ENTITY_CREATED, e);
|
||||
log.warn("[{}] Failed to push device action to rule engine: {}", deviceId, DataConstants.ENTITY_CREATED, e);
|
||||
}
|
||||
}
|
||||
|
||||
@ -415,7 +251,7 @@ public class DeviceEdgeProcessor extends BaseEdgeProcessor {
|
||||
if (device != null) {
|
||||
UpdateMsgType msgType = getUpdateMsgType(edgeEvent.getAction());
|
||||
DeviceUpdateMsg deviceUpdateMsg =
|
||||
deviceMsgConstructor.constructDeviceUpdatedMsg(msgType, device, null);
|
||||
deviceMsgConstructor.constructDeviceUpdatedMsg(msgType, device);
|
||||
DownlinkMsg.Builder builder = DownlinkMsg.newBuilder()
|
||||
.setDownlinkMsgId(EdgeUtils.nextPositiveInt())
|
||||
.addDeviceUpdateMsg(deviceUpdateMsg);
|
||||
@ -450,8 +286,6 @@ public class DeviceEdgeProcessor extends BaseEdgeProcessor {
|
||||
return convertRpcCallEventToDownlink(edgeEvent);
|
||||
case CREDENTIALS_REQUEST:
|
||||
return convertCredentialsRequestEventToDownlink(edgeEvent);
|
||||
case ENTITY_MERGE_REQUEST:
|
||||
return convertEntityMergeRequestEventToDownlink(edgeEvent);
|
||||
}
|
||||
return downlinkMsg;
|
||||
}
|
||||
@ -475,21 +309,6 @@ public class DeviceEdgeProcessor extends BaseEdgeProcessor {
|
||||
return builder.build();
|
||||
}
|
||||
|
||||
public DownlinkMsg convertEntityMergeRequestEventToDownlink(EdgeEvent edgeEvent) {
|
||||
DeviceId deviceId = new DeviceId(edgeEvent.getEntityId());
|
||||
Device device = deviceService.findDeviceById(edgeEvent.getTenantId(), deviceId);
|
||||
String conflictName = null;
|
||||
if(edgeEvent.getBody() != null) {
|
||||
conflictName = edgeEvent.getBody().get("conflictName").asText();
|
||||
}
|
||||
DeviceUpdateMsg deviceUpdateMsg = deviceMsgConstructor
|
||||
.constructDeviceUpdatedMsg(UpdateMsgType.ENTITY_MERGE_RPC_MESSAGE, device, conflictName);
|
||||
return DownlinkMsg.newBuilder()
|
||||
.setDownlinkMsgId(EdgeUtils.nextPositiveInt())
|
||||
.addDeviceUpdateMsg(deviceUpdateMsg)
|
||||
.build();
|
||||
}
|
||||
|
||||
public ListenableFuture<Void> processDeviceNotification(TenantId tenantId, TransportProtos.EdgeNotificationMsgProto edgeNotificationMsg) {
|
||||
return processEntityNotification(tenantId, edgeNotificationMsg);
|
||||
}
|
||||
@ -13,7 +13,7 @@
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.thingsboard.server.service.edge.rpc.processor;
|
||||
package org.thingsboard.server.service.edge.rpc.processor.device;
|
||||
|
||||
import com.google.common.util.concurrent.ListenableFuture;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
@ -28,6 +28,7 @@ import org.thingsboard.server.gen.edge.v1.DownlinkMsg;
|
||||
import org.thingsboard.server.gen.edge.v1.UpdateMsgType;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos;
|
||||
import org.thingsboard.server.queue.util.TbCoreComponent;
|
||||
import org.thingsboard.server.service.edge.rpc.processor.BaseEdgeProcessor;
|
||||
|
||||
@Component
|
||||
@Slf4j
|
||||
@ -13,7 +13,7 @@
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.thingsboard.server.service.edge.rpc.processor;
|
||||
package org.thingsboard.server.service.edge.rpc.processor.edge;
|
||||
|
||||
import com.google.common.util.concurrent.Futures;
|
||||
import com.google.common.util.concurrent.ListenableFuture;
|
||||
@ -35,6 +35,7 @@ import org.thingsboard.server.gen.edge.v1.DownlinkMsg;
|
||||
import org.thingsboard.server.gen.edge.v1.EdgeConfiguration;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos;
|
||||
import org.thingsboard.server.queue.util.TbCoreComponent;
|
||||
import org.thingsboard.server.service.edge.rpc.processor.BaseEdgeProcessor;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
@ -13,7 +13,7 @@
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.thingsboard.server.service.edge.rpc.processor;
|
||||
package org.thingsboard.server.service.edge.rpc.processor.entityview;
|
||||
|
||||
import com.google.common.util.concurrent.ListenableFuture;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
@ -28,6 +28,7 @@ import org.thingsboard.server.gen.edge.v1.EntityViewUpdateMsg;
|
||||
import org.thingsboard.server.gen.edge.v1.UpdateMsgType;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos;
|
||||
import org.thingsboard.server.queue.util.TbCoreComponent;
|
||||
import org.thingsboard.server.service.edge.rpc.processor.BaseEdgeProcessor;
|
||||
|
||||
@Component
|
||||
@Slf4j
|
||||
@ -13,7 +13,7 @@
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.thingsboard.server.service.edge.rpc.processor;
|
||||
package org.thingsboard.server.service.edge.rpc.processor.ota;
|
||||
|
||||
import com.google.common.util.concurrent.ListenableFuture;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
@ -28,6 +28,7 @@ import org.thingsboard.server.gen.edge.v1.OtaPackageUpdateMsg;
|
||||
import org.thingsboard.server.gen.edge.v1.UpdateMsgType;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos;
|
||||
import org.thingsboard.server.queue.util.TbCoreComponent;
|
||||
import org.thingsboard.server.service.edge.rpc.processor.BaseEdgeProcessor;
|
||||
|
||||
@Component
|
||||
@Slf4j
|
||||
@ -13,7 +13,7 @@
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.thingsboard.server.service.edge.rpc.processor;
|
||||
package org.thingsboard.server.service.edge.rpc.processor.queue;
|
||||
|
||||
import com.google.common.util.concurrent.ListenableFuture;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
@ -28,6 +28,7 @@ import org.thingsboard.server.gen.edge.v1.QueueUpdateMsg;
|
||||
import org.thingsboard.server.gen.edge.v1.UpdateMsgType;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos;
|
||||
import org.thingsboard.server.queue.util.TbCoreComponent;
|
||||
import org.thingsboard.server.service.edge.rpc.processor.BaseEdgeProcessor;
|
||||
|
||||
@Component
|
||||
@Slf4j
|
||||
@ -13,21 +13,13 @@
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.thingsboard.server.service.edge.rpc.processor;
|
||||
package org.thingsboard.server.service.edge.rpc.processor.relation;
|
||||
|
||||
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||
import com.google.common.util.concurrent.Futures;
|
||||
import com.google.common.util.concurrent.ListenableFuture;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.stereotype.Component;
|
||||
import org.thingsboard.common.util.JacksonUtil;
|
||||
import org.thingsboard.server.common.data.EdgeUtils;
|
||||
import org.thingsboard.server.common.data.EntityType;
|
||||
import org.thingsboard.server.common.data.edge.EdgeEvent;
|
||||
import org.thingsboard.server.common.data.edge.EdgeEventActionType;
|
||||
import org.thingsboard.server.common.data.edge.EdgeEventType;
|
||||
import org.thingsboard.server.common.data.exception.ThingsboardErrorCode;
|
||||
import org.thingsboard.server.common.data.exception.ThingsboardException;
|
||||
import org.thingsboard.server.common.data.id.AssetId;
|
||||
import org.thingsboard.server.common.data.id.CustomerId;
|
||||
import org.thingsboard.server.common.data.id.DashboardId;
|
||||
@ -40,24 +32,15 @@ import org.thingsboard.server.common.data.id.TenantId;
|
||||
import org.thingsboard.server.common.data.id.UserId;
|
||||
import org.thingsboard.server.common.data.relation.EntityRelation;
|
||||
import org.thingsboard.server.common.data.relation.RelationTypeGroup;
|
||||
import org.thingsboard.server.gen.edge.v1.DownlinkMsg;
|
||||
import org.thingsboard.server.gen.edge.v1.RelationUpdateMsg;
|
||||
import org.thingsboard.server.gen.edge.v1.UpdateMsgType;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos;
|
||||
import org.thingsboard.server.queue.util.TbCoreComponent;
|
||||
import org.thingsboard.server.service.edge.rpc.processor.BaseEdgeProcessor;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
import java.util.UUID;
|
||||
|
||||
@Component
|
||||
@Slf4j
|
||||
@TbCoreComponent
|
||||
public class RelationEdgeProcessor extends BaseEdgeProcessor {
|
||||
public abstract class BaseRelationProcessor extends BaseEdgeProcessor {
|
||||
|
||||
public ListenableFuture<Void> processRelationFromEdge(TenantId tenantId, RelationUpdateMsg relationUpdateMsg) {
|
||||
public ListenableFuture<Void> processRelationMsg(TenantId tenantId, RelationUpdateMsg relationUpdateMsg) {
|
||||
log.trace("[{}] processRelationFromEdge [{}]", tenantId, relationUpdateMsg);
|
||||
try {
|
||||
EntityRelation entityRelation = new EntityRelation();
|
||||
@ -71,10 +54,9 @@ public class RelationEdgeProcessor extends BaseEdgeProcessor {
|
||||
entityRelation.setTo(toId);
|
||||
|
||||
entityRelation.setType(relationUpdateMsg.getType());
|
||||
if (relationUpdateMsg.hasTypeGroup()) {
|
||||
entityRelation.setTypeGroup(RelationTypeGroup.valueOf(relationUpdateMsg.getTypeGroup()));
|
||||
}
|
||||
entityRelation.setAdditionalInfo(JacksonUtil.OBJECT_MAPPER.readTree(relationUpdateMsg.getAdditionalInfo()));
|
||||
entityRelation.setTypeGroup(relationUpdateMsg.hasTypeGroup()
|
||||
? RelationTypeGroup.valueOf(relationUpdateMsg.getTypeGroup()) : RelationTypeGroup.COMMON);
|
||||
entityRelation.setAdditionalInfo(JacksonUtil.toJsonNode(relationUpdateMsg.getAdditionalInfo()));
|
||||
switch (relationUpdateMsg.getMsgType()) {
|
||||
case ENTITY_CREATED_RPC_MESSAGE:
|
||||
case ENTITY_UPDATED_RPC_MESSAGE:
|
||||
@ -94,12 +76,12 @@ public class RelationEdgeProcessor extends BaseEdgeProcessor {
|
||||
return handleUnsupportedMsgType(relationUpdateMsg.getMsgType());
|
||||
}
|
||||
} catch (Exception e) {
|
||||
log.error("Failed to process relation update msg [{}]", relationUpdateMsg, e);
|
||||
return Futures.immediateFailedFuture(new RuntimeException("Failed to process relation update msg", e));
|
||||
log.error("[{}] Failed to process relation update msg [{}]", tenantId, relationUpdateMsg, e);
|
||||
return Futures.immediateFailedFuture(e);
|
||||
}
|
||||
}
|
||||
|
||||
private boolean isEntityExists(TenantId tenantId, EntityId entityId) throws ThingsboardException {
|
||||
private boolean isEntityExists(TenantId tenantId, EntityId entityId) {
|
||||
switch (entityId.getEntityType()) {
|
||||
case DEVICE:
|
||||
return deviceService.findDeviceById(tenantId, new DeviceId(entityId.getId())) != null;
|
||||
@ -113,43 +95,10 @@ public class RelationEdgeProcessor extends BaseEdgeProcessor {
|
||||
return userService.findUserById(tenantId, new UserId(entityId.getId())) != null;
|
||||
case DASHBOARD:
|
||||
return dashboardService.findDashboardById(tenantId, new DashboardId(entityId.getId())) != null;
|
||||
case EDGE:
|
||||
return edgeService.findEdgeById(tenantId, new EdgeId(entityId.getId())) != null;
|
||||
default:
|
||||
throw new ThingsboardException("Unsupported entity type " + entityId.getEntityType(), ThingsboardErrorCode.INVALID_ARGUMENTS);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
public DownlinkMsg convertRelationEventToDownlink(EdgeEvent edgeEvent) {
|
||||
EntityRelation entityRelation = JacksonUtil.OBJECT_MAPPER.convertValue(edgeEvent.getBody(), EntityRelation.class);
|
||||
UpdateMsgType msgType = getUpdateMsgType(edgeEvent.getAction());
|
||||
RelationUpdateMsg relationUpdateMsg = relationMsgConstructor.constructRelationUpdatedMsg(msgType, entityRelation);
|
||||
return DownlinkMsg.newBuilder()
|
||||
.setDownlinkMsgId(EdgeUtils.nextPositiveInt())
|
||||
.addRelationUpdateMsg(relationUpdateMsg)
|
||||
.build();
|
||||
}
|
||||
|
||||
public ListenableFuture<Void> processRelationNotification(TenantId tenantId, TransportProtos.EdgeNotificationMsgProto edgeNotificationMsg) throws JsonProcessingException {
|
||||
EntityRelation relation = JacksonUtil.OBJECT_MAPPER.readValue(edgeNotificationMsg.getBody(), EntityRelation.class);
|
||||
if (relation.getFrom().getEntityType().equals(EntityType.EDGE) ||
|
||||
relation.getTo().getEntityType().equals(EntityType.EDGE)) {
|
||||
return Futures.immediateFuture(null);
|
||||
}
|
||||
|
||||
Set<EdgeId> uniqueEdgeIds = new HashSet<>();
|
||||
uniqueEdgeIds.addAll(edgeService.findAllRelatedEdgeIds(tenantId, relation.getTo()));
|
||||
uniqueEdgeIds.addAll(edgeService.findAllRelatedEdgeIds(tenantId, relation.getFrom()));
|
||||
if (uniqueEdgeIds.isEmpty()) {
|
||||
return Futures.immediateFuture(null);
|
||||
}
|
||||
List<ListenableFuture<Void>> futures = new ArrayList<>();
|
||||
for (EdgeId edgeId : uniqueEdgeIds) {
|
||||
futures.add(saveEdgeEvent(tenantId,
|
||||
edgeId,
|
||||
EdgeEventType.RELATION,
|
||||
EdgeEventActionType.valueOf(edgeNotificationMsg.getAction()),
|
||||
null,
|
||||
JacksonUtil.OBJECT_MAPPER.valueToTree(relation)));
|
||||
}
|
||||
return Futures.transform(Futures.allAsList(futures), voids -> null, dbCallbackExecutorService);
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,82 @@
|
||||
/**
|
||||
* Copyright © 2016-2022 The Thingsboard Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.thingsboard.server.service.edge.rpc.processor.relation;
|
||||
|
||||
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||
import com.google.common.util.concurrent.Futures;
|
||||
import com.google.common.util.concurrent.ListenableFuture;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.stereotype.Component;
|
||||
import org.thingsboard.common.util.JacksonUtil;
|
||||
import org.thingsboard.server.common.data.EdgeUtils;
|
||||
import org.thingsboard.server.common.data.EntityType;
|
||||
import org.thingsboard.server.common.data.edge.EdgeEvent;
|
||||
import org.thingsboard.server.common.data.edge.EdgeEventActionType;
|
||||
import org.thingsboard.server.common.data.edge.EdgeEventType;
|
||||
import org.thingsboard.server.common.data.id.EdgeId;
|
||||
import org.thingsboard.server.common.data.id.TenantId;
|
||||
import org.thingsboard.server.common.data.relation.EntityRelation;
|
||||
import org.thingsboard.server.gen.edge.v1.DownlinkMsg;
|
||||
import org.thingsboard.server.gen.edge.v1.RelationUpdateMsg;
|
||||
import org.thingsboard.server.gen.edge.v1.UpdateMsgType;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos;
|
||||
import org.thingsboard.server.queue.util.TbCoreComponent;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
|
||||
@Component
|
||||
@Slf4j
|
||||
@TbCoreComponent
|
||||
public class RelationEdgeProcessor extends BaseRelationProcessor {
|
||||
|
||||
public DownlinkMsg convertRelationEventToDownlink(EdgeEvent edgeEvent) {
|
||||
EntityRelation entityRelation = JacksonUtil.OBJECT_MAPPER.convertValue(edgeEvent.getBody(), EntityRelation.class);
|
||||
UpdateMsgType msgType = getUpdateMsgType(edgeEvent.getAction());
|
||||
RelationUpdateMsg relationUpdateMsg = relationMsgConstructor.constructRelationUpdatedMsg(msgType, entityRelation);
|
||||
return DownlinkMsg.newBuilder()
|
||||
.setDownlinkMsgId(EdgeUtils.nextPositiveInt())
|
||||
.addRelationUpdateMsg(relationUpdateMsg)
|
||||
.build();
|
||||
}
|
||||
|
||||
public ListenableFuture<Void> processRelationNotification(TenantId tenantId, TransportProtos.EdgeNotificationMsgProto edgeNotificationMsg) throws JsonProcessingException {
|
||||
EntityRelation relation = JacksonUtil.OBJECT_MAPPER.readValue(edgeNotificationMsg.getBody(), EntityRelation.class);
|
||||
if (relation.getFrom().getEntityType().equals(EntityType.EDGE) ||
|
||||
relation.getTo().getEntityType().equals(EntityType.EDGE)) {
|
||||
return Futures.immediateFuture(null);
|
||||
}
|
||||
|
||||
Set<EdgeId> uniqueEdgeIds = new HashSet<>();
|
||||
uniqueEdgeIds.addAll(edgeService.findAllRelatedEdgeIds(tenantId, relation.getTo()));
|
||||
uniqueEdgeIds.addAll(edgeService.findAllRelatedEdgeIds(tenantId, relation.getFrom()));
|
||||
if (uniqueEdgeIds.isEmpty()) {
|
||||
return Futures.immediateFuture(null);
|
||||
}
|
||||
List<ListenableFuture<Void>> futures = new ArrayList<>();
|
||||
for (EdgeId edgeId : uniqueEdgeIds) {
|
||||
futures.add(saveEdgeEvent(tenantId,
|
||||
edgeId,
|
||||
EdgeEventType.RELATION,
|
||||
EdgeEventActionType.valueOf(edgeNotificationMsg.getAction()),
|
||||
null,
|
||||
JacksonUtil.OBJECT_MAPPER.valueToTree(relation)));
|
||||
}
|
||||
return Futures.transform(Futures.allAsList(futures), voids -> null, dbCallbackExecutorService);
|
||||
}
|
||||
}
|
||||
@ -13,7 +13,7 @@
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.thingsboard.server.service.edge.rpc.processor;
|
||||
package org.thingsboard.server.service.edge.rpc.processor.rule;
|
||||
|
||||
import com.google.common.util.concurrent.ListenableFuture;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
@ -31,6 +31,7 @@ import org.thingsboard.server.gen.edge.v1.RuleChainUpdateMsg;
|
||||
import org.thingsboard.server.gen.edge.v1.UpdateMsgType;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos;
|
||||
import org.thingsboard.server.queue.util.TbCoreComponent;
|
||||
import org.thingsboard.server.service.edge.rpc.processor.BaseEdgeProcessor;
|
||||
|
||||
import static org.thingsboard.server.service.edge.DefaultEdgeNotificationService.EDGE_IS_ROOT_BODY_KEY;
|
||||
|
||||
@ -13,7 +13,7 @@
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.thingsboard.server.service.edge.rpc.processor;
|
||||
package org.thingsboard.server.service.edge.rpc.processor.settings;
|
||||
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.stereotype.Component;
|
||||
@ -24,6 +24,7 @@ import org.thingsboard.server.common.data.edge.EdgeEvent;
|
||||
import org.thingsboard.server.gen.edge.v1.AdminSettingsUpdateMsg;
|
||||
import org.thingsboard.server.gen.edge.v1.DownlinkMsg;
|
||||
import org.thingsboard.server.queue.util.TbCoreComponent;
|
||||
import org.thingsboard.server.service.edge.rpc.processor.BaseEdgeProcessor;
|
||||
|
||||
@Component
|
||||
@Slf4j
|
||||
@ -13,9 +13,10 @@
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.thingsboard.server.service.edge.rpc.processor;
|
||||
package org.thingsboard.server.service.edge.rpc.processor.telemetry;
|
||||
|
||||
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||
import com.fasterxml.jackson.databind.JsonNode;
|
||||
import com.google.common.util.concurrent.FutureCallback;
|
||||
import com.google.common.util.concurrent.ListenableFuture;
|
||||
import com.google.common.util.concurrent.SettableFuture;
|
||||
@ -26,19 +27,16 @@ import com.google.gson.JsonParser;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.commons.lang3.tuple.ImmutablePair;
|
||||
import org.apache.commons.lang3.tuple.Pair;
|
||||
import org.springframework.stereotype.Component;
|
||||
import org.thingsboard.common.util.JacksonUtil;
|
||||
import org.thingsboard.rule.engine.api.msg.DeviceAttributesEventNotificationMsg;
|
||||
import org.thingsboard.server.common.data.DataConstants;
|
||||
import org.thingsboard.server.common.data.Device;
|
||||
import org.thingsboard.server.common.data.DeviceProfile;
|
||||
import org.thingsboard.server.common.data.EdgeUtils;
|
||||
import org.thingsboard.server.common.data.EntityType;
|
||||
import org.thingsboard.server.common.data.EntityView;
|
||||
import org.thingsboard.server.common.data.asset.Asset;
|
||||
import org.thingsboard.server.common.data.asset.AssetProfile;
|
||||
import org.thingsboard.server.common.data.edge.Edge;
|
||||
import org.thingsboard.server.common.data.edge.EdgeEvent;
|
||||
import org.thingsboard.server.common.data.edge.EdgeEventActionType;
|
||||
import org.thingsboard.server.common.data.id.AssetId;
|
||||
import org.thingsboard.server.common.data.id.CustomerId;
|
||||
@ -60,24 +58,22 @@ import org.thingsboard.server.common.transport.adaptor.JsonConverter;
|
||||
import org.thingsboard.server.common.transport.util.JsonUtils;
|
||||
import org.thingsboard.server.dao.model.ModelConstants;
|
||||
import org.thingsboard.server.gen.edge.v1.AttributeDeleteMsg;
|
||||
import org.thingsboard.server.gen.edge.v1.DownlinkMsg;
|
||||
import org.thingsboard.server.gen.edge.v1.EntityDataProto;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos;
|
||||
import org.thingsboard.server.queue.TbQueueCallback;
|
||||
import org.thingsboard.server.queue.TbQueueMsgMetadata;
|
||||
import org.thingsboard.server.queue.TbQueueProducer;
|
||||
import org.thingsboard.server.queue.common.TbProtoQueueMsg;
|
||||
import org.thingsboard.server.queue.util.TbCoreComponent;
|
||||
import org.thingsboard.server.service.edge.rpc.processor.BaseEdgeProcessor;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
import javax.annotation.PostConstruct;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.UUID;
|
||||
|
||||
@Component
|
||||
@Slf4j
|
||||
@TbCoreComponent
|
||||
public class TelemetryEdgeProcessor extends BaseEdgeProcessor {
|
||||
public abstract class BaseTelemetryProcessor extends BaseEdgeProcessor {
|
||||
|
||||
private final Gson gson = new Gson();
|
||||
|
||||
@ -88,15 +84,17 @@ public class TelemetryEdgeProcessor extends BaseEdgeProcessor {
|
||||
tbCoreMsgProducer = producerProvider.getTbCoreMsgProducer();
|
||||
}
|
||||
|
||||
public List<ListenableFuture<Void>> processTelemetryFromEdge(TenantId tenantId, EntityDataProto entityData) {
|
||||
log.trace("[{}] processTelemetryFromEdge [{}]", tenantId, entityData);
|
||||
abstract protected String getMsgSourceKey();
|
||||
|
||||
public List<ListenableFuture<Void>> processTelemetryMsg(TenantId tenantId, EntityDataProto entityData) {
|
||||
log.trace("[{}] processTelemetryMsg [{}]", tenantId, entityData);
|
||||
List<ListenableFuture<Void>> result = new ArrayList<>();
|
||||
EntityId entityId = constructEntityId(entityData.getEntityType(), entityData.getEntityIdMSB(), entityData.getEntityIdLSB());
|
||||
if ((entityData.hasPostAttributesMsg() || entityData.hasPostTelemetryMsg() || entityData.hasAttributesUpdatedMsg()) && entityId != null) {
|
||||
Pair<TbMsgMetaData, CustomerId> pair = getBaseMsgMetadataAndCustomerId(tenantId, entityId);
|
||||
TbMsgMetaData metaData = pair.getKey();
|
||||
CustomerId customerId = pair.getValue();
|
||||
metaData.putValue(DataConstants.MSG_SOURCE_KEY, DataConstants.EDGE_MSG_SOURCE);
|
||||
metaData.putValue(DataConstants.MSG_SOURCE_KEY, getMsgSourceKey());
|
||||
if (entityData.hasPostAttributesMsg()) {
|
||||
result.add(processPostAttributes(tenantId, customerId, entityId, entityData.getPostAttributesMsg(), metaData));
|
||||
}
|
||||
@ -283,11 +281,11 @@ public class TelemetryEdgeProcessor extends BaseEdgeProcessor {
|
||||
String entityType) {
|
||||
SettableFuture<Void> futureToSet = SettableFuture.create();
|
||||
String scope = attributeDeleteMsg.getScope();
|
||||
List<String> attributeNames = attributeDeleteMsg.getAttributeNamesList();
|
||||
attributesService.removeAll(tenantId, entityId, scope, attributeNames);
|
||||
List<String> attributeKeys = attributeDeleteMsg.getAttributeNamesList();
|
||||
attributesService.removeAll(tenantId, entityId, scope, attributeKeys);
|
||||
if (EntityType.DEVICE.name().equals(entityType)) {
|
||||
tbClusterService.pushMsgToCore(DeviceAttributesEventNotificationMsg.onDelete(
|
||||
tenantId, (DeviceId) entityId, scope, attributeNames), new TbQueueCallback() {
|
||||
tenantId, (DeviceId) entityId, scope, attributeKeys), new TbQueueCallback() {
|
||||
@Override
|
||||
public void onSuccess(TbQueueMsgMetadata metadata) {
|
||||
futureToSet.set(null);
|
||||
@ -303,47 +301,42 @@ public class TelemetryEdgeProcessor extends BaseEdgeProcessor {
|
||||
return futureToSet;
|
||||
}
|
||||
|
||||
public DownlinkMsg convertTelemetryEventToDownlink(EdgeEvent edgeEvent) throws JsonProcessingException {
|
||||
public EntityDataProto convertTelemetryEventToEntityDataProto(EntityType entityType,
|
||||
UUID entityUUID,
|
||||
EdgeEventActionType actionType,
|
||||
JsonNode body) throws JsonProcessingException {
|
||||
EntityId entityId;
|
||||
switch (edgeEvent.getType()) {
|
||||
switch (entityType) {
|
||||
case DEVICE:
|
||||
entityId = new DeviceId(edgeEvent.getEntityId());
|
||||
entityId = new DeviceId(entityUUID);
|
||||
break;
|
||||
case ASSET:
|
||||
entityId = new AssetId(edgeEvent.getEntityId());
|
||||
entityId = new AssetId(entityUUID);
|
||||
break;
|
||||
case ENTITY_VIEW:
|
||||
entityId = new EntityViewId(edgeEvent.getEntityId());
|
||||
entityId = new EntityViewId(entityUUID);
|
||||
break;
|
||||
case DASHBOARD:
|
||||
entityId = new DashboardId(edgeEvent.getEntityId());
|
||||
entityId = new DashboardId(entityUUID);
|
||||
break;
|
||||
case TENANT:
|
||||
entityId = TenantId.fromUUID(edgeEvent.getEntityId());
|
||||
entityId = TenantId.fromUUID(entityUUID);
|
||||
break;
|
||||
case CUSTOMER:
|
||||
entityId = new CustomerId(edgeEvent.getEntityId());
|
||||
entityId = new CustomerId(entityUUID);
|
||||
break;
|
||||
case USER:
|
||||
entityId = new UserId(edgeEvent.getEntityId());
|
||||
entityId = new UserId(entityUUID);
|
||||
break;
|
||||
case EDGE:
|
||||
entityId = new EdgeId(edgeEvent.getEntityId());
|
||||
entityId = new EdgeId(entityUUID);
|
||||
break;
|
||||
default:
|
||||
log.warn("Unsupported edge event type [{}]", edgeEvent);
|
||||
log.warn("Unsupported edge event type [{}]", entityType);
|
||||
return null;
|
||||
}
|
||||
return constructEntityDataProtoMsg(entityId, edgeEvent.getAction(),
|
||||
JsonParser.parseString(JacksonUtil.OBJECT_MAPPER.writeValueAsString(edgeEvent.getBody())));
|
||||
}
|
||||
|
||||
private DownlinkMsg constructEntityDataProtoMsg(EntityId entityId, EdgeEventActionType actionType, JsonElement entityData) {
|
||||
EntityDataProto entityDataProto = entityDataMsgConstructor.constructEntityDataMsg(entityId, actionType, entityData);
|
||||
return DownlinkMsg.newBuilder()
|
||||
.setDownlinkMsgId(EdgeUtils.nextPositiveInt())
|
||||
.addEntityData(entityDataProto)
|
||||
.build();
|
||||
JsonElement entityData = JsonParser.parseString(JacksonUtil.OBJECT_MAPPER.writeValueAsString(body));
|
||||
return entityDataMsgConstructor.constructEntityDataMsg(entityId, actionType, entityData);
|
||||
}
|
||||
|
||||
}
|
||||
@ -0,0 +1,48 @@
|
||||
/**
|
||||
* Copyright © 2016-2022 The Thingsboard Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.thingsboard.server.service.edge.rpc.processor.telemetry;
|
||||
|
||||
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.stereotype.Component;
|
||||
import org.thingsboard.server.common.data.DataConstants;
|
||||
import org.thingsboard.server.common.data.EdgeUtils;
|
||||
import org.thingsboard.server.common.data.EntityType;
|
||||
import org.thingsboard.server.common.data.edge.EdgeEvent;
|
||||
import org.thingsboard.server.gen.edge.v1.DownlinkMsg;
|
||||
import org.thingsboard.server.gen.edge.v1.EntityDataProto;
|
||||
import org.thingsboard.server.queue.util.TbCoreComponent;
|
||||
|
||||
@Component
|
||||
@Slf4j
|
||||
@TbCoreComponent
|
||||
public class TelemetryEdgeProcessor extends BaseTelemetryProcessor {
|
||||
|
||||
@Override
|
||||
protected String getMsgSourceKey() {
|
||||
return DataConstants.EDGE_MSG_SOURCE;
|
||||
}
|
||||
|
||||
public DownlinkMsg convertTelemetryEventToDownlink(EdgeEvent edgeEvent) throws JsonProcessingException {
|
||||
EntityType entityType = EntityType.valueOf(edgeEvent.getType().name());
|
||||
EntityDataProto entityDataProto = convertTelemetryEventToEntityDataProto(entityType, edgeEvent.getEntityId(),
|
||||
edgeEvent.getAction(), edgeEvent.getBody());
|
||||
return DownlinkMsg.newBuilder()
|
||||
.setDownlinkMsgId(EdgeUtils.nextPositiveInt())
|
||||
.addEntityData(entityDataProto)
|
||||
.build();
|
||||
}
|
||||
}
|
||||
@ -13,7 +13,7 @@
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.thingsboard.server.service.edge.rpc.processor;
|
||||
package org.thingsboard.server.service.edge.rpc.processor.user;
|
||||
|
||||
import com.google.common.util.concurrent.ListenableFuture;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
@ -29,6 +29,7 @@ import org.thingsboard.server.gen.edge.v1.UpdateMsgType;
|
||||
import org.thingsboard.server.gen.edge.v1.UserCredentialsUpdateMsg;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos;
|
||||
import org.thingsboard.server.queue.util.TbCoreComponent;
|
||||
import org.thingsboard.server.service.edge.rpc.processor.BaseEdgeProcessor;
|
||||
|
||||
@Component
|
||||
@Slf4j
|
||||
@ -13,7 +13,7 @@
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.thingsboard.server.service.edge.rpc.processor;
|
||||
package org.thingsboard.server.service.edge.rpc.processor.widget;
|
||||
|
||||
import com.google.common.util.concurrent.ListenableFuture;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
@ -28,6 +28,7 @@ import org.thingsboard.server.gen.edge.v1.UpdateMsgType;
|
||||
import org.thingsboard.server.gen.edge.v1.WidgetsBundleUpdateMsg;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos;
|
||||
import org.thingsboard.server.queue.util.TbCoreComponent;
|
||||
import org.thingsboard.server.service.edge.rpc.processor.BaseEdgeProcessor;
|
||||
|
||||
@Component
|
||||
@Slf4j
|
||||
@ -13,7 +13,7 @@
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.thingsboard.server.service.edge.rpc.processor;
|
||||
package org.thingsboard.server.service.edge.rpc.processor.widget;
|
||||
|
||||
import com.google.common.util.concurrent.ListenableFuture;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
@ -28,6 +28,7 @@ import org.thingsboard.server.gen.edge.v1.UpdateMsgType;
|
||||
import org.thingsboard.server.gen.edge.v1.WidgetTypeUpdateMsg;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos;
|
||||
import org.thingsboard.server.queue.util.TbCoreComponent;
|
||||
import org.thingsboard.server.service.edge.rpc.processor.BaseEdgeProcessor;
|
||||
|
||||
@Component
|
||||
@Slf4j
|
||||
@ -17,6 +17,10 @@ package org.thingsboard.server.controller;
|
||||
|
||||
import com.datastax.oss.driver.api.core.uuid.Uuids;
|
||||
import com.fasterxml.jackson.core.type.TypeReference;
|
||||
import com.google.common.util.concurrent.Futures;
|
||||
import com.google.common.util.concurrent.ListenableFuture;
|
||||
import com.google.common.util.concurrent.ListeningExecutorService;
|
||||
import com.google.common.util.concurrent.MoreExecutors;
|
||||
import org.junit.After;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
@ -28,6 +32,7 @@ import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Primary;
|
||||
import org.springframework.test.context.ContextConfiguration;
|
||||
import org.springframework.test.context.TestPropertySource;
|
||||
import org.thingsboard.common.util.ThingsBoardExecutors;
|
||||
import org.thingsboard.server.common.data.Customer;
|
||||
import org.thingsboard.server.common.data.Device;
|
||||
import org.thingsboard.server.common.data.EntitySubtype;
|
||||
@ -58,13 +63,10 @@ import org.thingsboard.server.gen.edge.v1.RuleChainUpdateMsg;
|
||||
import org.thingsboard.server.gen.edge.v1.UserCredentialsUpdateMsg;
|
||||
import org.thingsboard.server.gen.edge.v1.UserUpdateMsg;
|
||||
|
||||
import java.io.File;
|
||||
import java.net.URL;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.nio.file.Files;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
import static org.hamcrest.Matchers.containsString;
|
||||
@ -86,6 +88,10 @@ public abstract class BaseEdgeControllerTest extends AbstractControllerTest {
|
||||
private TenantId tenantId;
|
||||
private User tenantAdmin;
|
||||
|
||||
ListeningExecutorService executor;
|
||||
|
||||
List<ListenableFuture<Edge>> futures;
|
||||
|
||||
@Autowired
|
||||
private EdgeDao edgeDao;
|
||||
|
||||
@ -99,6 +105,8 @@ public abstract class BaseEdgeControllerTest extends AbstractControllerTest {
|
||||
|
||||
@Before
|
||||
public void beforeTest() throws Exception {
|
||||
executor = MoreExecutors.listeningDecorator(ThingsBoardExecutors.newWorkStealingPool(8, getClass()));
|
||||
|
||||
loginSysAdmin();
|
||||
|
||||
Tenant tenant = new Tenant();
|
||||
@ -119,6 +127,8 @@ public abstract class BaseEdgeControllerTest extends AbstractControllerTest {
|
||||
|
||||
@After
|
||||
public void afterTest() throws Exception {
|
||||
executor.shutdownNow();
|
||||
|
||||
loginSysAdmin();
|
||||
|
||||
doDelete("/api/tenant/" + savedTenant.getId().getId().toString())
|
||||
@ -385,11 +395,14 @@ public abstract class BaseEdgeControllerTest extends AbstractControllerTest {
|
||||
|
||||
@Test
|
||||
public void testFindTenantEdges() throws Exception {
|
||||
List<Edge> edges = new ArrayList<>();
|
||||
for (int i = 0; i < 178; i++) {
|
||||
int cntEntity = 178;
|
||||
futures = new ArrayList<>(cntEntity);
|
||||
for (int i = 0; i < cntEntity; i++) {
|
||||
Edge edge = constructEdge("Edge" + i, "default");
|
||||
edges.add(doPost("/api/edge", edge, Edge.class));
|
||||
futures.add(executor.submit(() ->
|
||||
doPost("/api/edge", edge, Edge.class)));
|
||||
}
|
||||
List<Edge> edges = new ArrayList<>(Futures.allAsList(futures).get(TIMEOUT, TimeUnit.SECONDS));
|
||||
List<Edge> loadedEdges = new ArrayList<>();
|
||||
PageLink pageLink = new PageLink(23);
|
||||
PageData<Edge> pageData = null;
|
||||
@ -412,23 +425,30 @@ public abstract class BaseEdgeControllerTest extends AbstractControllerTest {
|
||||
@Test
|
||||
public void testFindTenantEdgesByName() throws Exception {
|
||||
String title1 = "Edge title 1";
|
||||
List<Edge> edgesTitle1 = new ArrayList<>();
|
||||
for (int i = 0; i < 143; i++) {
|
||||
int cntEntity = 143;
|
||||
futures = new ArrayList<>(cntEntity);
|
||||
for (int i = 0; i < cntEntity; i++) {
|
||||
String suffix = StringUtils.randomAlphanumeric(15);
|
||||
String name = title1 + suffix;
|
||||
name = i % 2 == 0 ? name.toLowerCase() : name.toUpperCase();
|
||||
Edge edge = constructEdge(name, "default");
|
||||
edgesTitle1.add(doPost("/api/edge", edge, Edge.class));
|
||||
futures.add(executor.submit(() ->
|
||||
doPost("/api/edge", edge, Edge.class)));
|
||||
}
|
||||
List<Edge> edgesTitle1 = new ArrayList<>(Futures.allAsList(futures).get(TIMEOUT, TimeUnit.SECONDS));
|
||||
|
||||
String title2 = "Edge title 2";
|
||||
List<Edge> edgesTitle2 = new ArrayList<>();
|
||||
for (int i = 0; i < 75; i++) {
|
||||
cntEntity = 75;
|
||||
futures = new ArrayList<>(cntEntity);
|
||||
for (int i = 0; i < cntEntity; i++) {
|
||||
String suffix = StringUtils.randomAlphanumeric(15);
|
||||
String name = title2 + suffix;
|
||||
name = i % 2 == 0 ? name.toLowerCase() : name.toUpperCase();
|
||||
Edge edge = constructEdge(name, "default");
|
||||
edgesTitle2.add(doPost("/api/edge", edge, Edge.class));
|
||||
futures.add(executor.submit(() ->
|
||||
doPost("/api/edge", edge, Edge.class)));
|
||||
}
|
||||
List<Edge> edgesTitle2 = new ArrayList<>(Futures.allAsList(futures).get(TIMEOUT, TimeUnit.SECONDS));
|
||||
|
||||
List<Edge> loadedEdgesTitle1 = new ArrayList<>();
|
||||
PageLink pageLink = new PageLink(15, 0, title1);
|
||||
@ -494,24 +514,31 @@ public abstract class BaseEdgeControllerTest extends AbstractControllerTest {
|
||||
public void testFindTenantEdgesByType() throws Exception {
|
||||
String title1 = "Edge title 1";
|
||||
String type1 = "typeA";
|
||||
List<Edge> edgesType1 = new ArrayList<>();
|
||||
for (int i = 0; i < 143; i++) {
|
||||
int cntEntity = 143;
|
||||
futures = new ArrayList<>(cntEntity);
|
||||
for (int i = 0; i < cntEntity; i++) {
|
||||
String suffix = StringUtils.randomAlphanumeric(15);
|
||||
String name = title1 + suffix;
|
||||
name = i % 2 == 0 ? name.toLowerCase() : name.toUpperCase();
|
||||
Edge edge = constructEdge(name, type1);
|
||||
edgesType1.add(doPost("/api/edge", edge, Edge.class));
|
||||
futures.add(executor.submit(() ->
|
||||
doPost("/api/edge", edge, Edge.class)));
|
||||
}
|
||||
List<Edge> edgesType1 = new ArrayList<>(Futures.allAsList(futures).get(TIMEOUT, TimeUnit.SECONDS));
|
||||
|
||||
String title2 = "Edge title 2";
|
||||
String type2 = "typeB";
|
||||
List<Edge> edgesType2 = new ArrayList<>();
|
||||
for (int i = 0; i < 75; i++) {
|
||||
cntEntity = 75;
|
||||
futures = new ArrayList<>(cntEntity);
|
||||
for (int i = 0; i < cntEntity; i++) {
|
||||
String suffix = StringUtils.randomAlphanumeric(15);
|
||||
String name = title2 + suffix;
|
||||
name = i % 2 == 0 ? name.toLowerCase() : name.toUpperCase();
|
||||
Edge edge = constructEdge(name, type2);
|
||||
edgesType2.add(doPost("/api/edge", edge, Edge.class));
|
||||
futures.add(executor.submit(() ->
|
||||
doPost("/api/edge", edge, Edge.class)));
|
||||
}
|
||||
List<Edge> edgesType2 = new ArrayList<>(Futures.allAsList(futures).get(TIMEOUT, TimeUnit.SECONDS));
|
||||
|
||||
List<Edge> loadedEdgesType1 = new ArrayList<>();
|
||||
PageLink pageLink = new PageLink(15);
|
||||
@ -582,14 +609,17 @@ public abstract class BaseEdgeControllerTest extends AbstractControllerTest {
|
||||
|
||||
Mockito.reset(tbClusterService, auditLogService);
|
||||
|
||||
List<Edge> edges = new ArrayList<>();
|
||||
int cntEntity = 128;
|
||||
futures = new ArrayList<>(cntEntity);
|
||||
for (int i = 0; i < cntEntity; i++) {
|
||||
Edge edge = constructEdge("Edge" + i, "default");
|
||||
edge = doPost("/api/edge", edge, Edge.class);
|
||||
edges.add(doPost("/api/customer/" + customerId.getId().toString()
|
||||
+ "/edge/" + edge.getId().getId().toString(), Edge.class));
|
||||
futures.add(executor.submit(() -> {
|
||||
Edge edge1 = doPost("/api/edge", edge, Edge.class);
|
||||
return doPost("/api/customer/" + customerId.getId().toString()
|
||||
+ "/edge/" + edge1.getId().getId().toString(), Edge.class);
|
||||
}));
|
||||
}
|
||||
List<Edge> edges = new ArrayList<>(Futures.allAsList(futures).get(TIMEOUT, TimeUnit.SECONDS));
|
||||
|
||||
testNotifyManyEntityManyTimeMsgToEdgeServiceEntityEqAny(new Edge(), new Edge(),
|
||||
savedTenant.getId(), customerId, tenantAdmin.getId(), tenantAdmin.getEmail(),
|
||||
@ -622,28 +652,37 @@ public abstract class BaseEdgeControllerTest extends AbstractControllerTest {
|
||||
customer = doPost("/api/customer", customer, Customer.class);
|
||||
CustomerId customerId = customer.getId();
|
||||
|
||||
int cntEntity = 125;
|
||||
String title1 = "Edge title 1";
|
||||
List<Edge> edgesTitle1 = new ArrayList<>();
|
||||
for (int i = 0; i < 125; i++) {
|
||||
futures = new ArrayList<>(cntEntity);
|
||||
for (int i = 0; i < cntEntity; i++) {
|
||||
String suffix = StringUtils.randomAlphanumeric(15);
|
||||
String name = title1 + suffix;
|
||||
name = i % 2 == 0 ? name.toLowerCase() : name.toUpperCase();
|
||||
Edge edge = constructEdge(name, "default");
|
||||
edge = doPost("/api/edge", edge, Edge.class);
|
||||
edgesTitle1.add(doPost("/api/customer/" + customerId.getId().toString()
|
||||
+ "/edge/" + edge.getId().getId().toString(), Edge.class));
|
||||
futures.add(executor.submit(() -> {
|
||||
Edge edge1 = doPost("/api/edge", edge, Edge.class);
|
||||
return doPost("/api/customer/" + customerId.getId().toString()
|
||||
+ "/edge/" + edge1.getId().getId().toString(), Edge.class);
|
||||
}));
|
||||
}
|
||||
List<Edge> edgesTitle1 = new ArrayList<>(Futures.allAsList(futures).get(TIMEOUT, TimeUnit.SECONDS));
|
||||
|
||||
cntEntity = 143;
|
||||
String title2 = "Edge title 2";
|
||||
List<Edge> edgesTitle2 = new ArrayList<>();
|
||||
for (int i = 0; i < 143; i++) {
|
||||
futures = new ArrayList<>(cntEntity);
|
||||
for (int i = 0; i < cntEntity; i++) {
|
||||
String suffix = StringUtils.randomAlphanumeric(15);
|
||||
String name = title2 + suffix;
|
||||
name = i % 2 == 0 ? name.toLowerCase() : name.toUpperCase();
|
||||
Edge edge = constructEdge(name, "default");
|
||||
edge = doPost("/api/edge", edge, Edge.class);
|
||||
edgesTitle2.add(doPost("/api/customer/" + customerId.getId().toString()
|
||||
+ "/edge/" + edge.getId().getId().toString(), Edge.class));
|
||||
futures.add(executor.submit(() -> {
|
||||
Edge edge1 = doPost("/api/edge", edge, Edge.class);
|
||||
return doPost("/api/customer/" + customerId.getId().toString()
|
||||
+ "/edge/" + edge1.getId().getId().toString(), Edge.class);
|
||||
}));
|
||||
}
|
||||
List<Edge> edgesTitle2 = new ArrayList<>(Futures.allAsList(futures).get(TIMEOUT, TimeUnit.SECONDS));
|
||||
|
||||
List<Edge> loadedEdgesTitle1 = new ArrayList<>();
|
||||
PageLink pageLink = new PageLink(15, 0, title1);
|
||||
@ -687,7 +726,7 @@ public abstract class BaseEdgeControllerTest extends AbstractControllerTest {
|
||||
.andExpect(status().isOk());
|
||||
}
|
||||
|
||||
int cntEntity = loadedEdgesTitle1.size();
|
||||
cntEntity = loadedEdgesTitle1.size();
|
||||
testNotifyManyEntityManyTimeMsgToEdgeServiceEntityEqAnyAdditionalInfoAny(new Edge(), new Edge(),
|
||||
savedTenant.getId(), customerId, tenantAdmin.getId(), tenantAdmin.getEmail(),
|
||||
ActionType.UNASSIGNED_FROM_CUSTOMER, ActionType.UNASSIGNED_FROM_CUSTOMER, cntEntity, cntEntity, 3);
|
||||
@ -719,30 +758,39 @@ public abstract class BaseEdgeControllerTest extends AbstractControllerTest {
|
||||
customer = doPost("/api/customer", customer, Customer.class);
|
||||
CustomerId customerId = customer.getId();
|
||||
|
||||
int cntEntity = 125;
|
||||
String title1 = "Edge title 1";
|
||||
String type1 = "typeC";
|
||||
List<Edge> edgesType1 = new ArrayList<>();
|
||||
for (int i = 0; i < 125; i++) {
|
||||
futures = new ArrayList<>(cntEntity);
|
||||
for (int i = 0; i < cntEntity; i++) {
|
||||
String suffix = StringUtils.randomAlphanumeric(15);
|
||||
String name = title1 + suffix;
|
||||
name = i % 2 == 0 ? name.toLowerCase() : name.toUpperCase();
|
||||
Edge edge = constructEdge(name, type1);
|
||||
edge = doPost("/api/edge", edge, Edge.class);
|
||||
edgesType1.add(doPost("/api/customer/" + customerId.getId().toString()
|
||||
+ "/edge/" + edge.getId().getId().toString(), Edge.class));
|
||||
futures.add(executor.submit(() -> {
|
||||
Edge edge1 = doPost("/api/edge", edge, Edge.class);
|
||||
return doPost("/api/customer/" + customerId.getId().toString()
|
||||
+ "/edge/" + edge1.getId().getId().toString(), Edge.class);
|
||||
}));
|
||||
}
|
||||
List<Edge> edgesType1 = new ArrayList<>(Futures.allAsList(futures).get(TIMEOUT, TimeUnit.SECONDS));
|
||||
|
||||
cntEntity = 143;
|
||||
String title2 = "Edge title 2";
|
||||
String type2 = "typeD";
|
||||
List<Edge> edgesType2 = new ArrayList<>();
|
||||
for (int i = 0; i < 143; i++) {
|
||||
futures = new ArrayList<>(cntEntity);
|
||||
for (int i = 0; i < cntEntity; i++) {
|
||||
String suffix = StringUtils.randomAlphanumeric(15);
|
||||
String name = title2 + suffix;
|
||||
name = i % 2 == 0 ? name.toLowerCase() : name.toUpperCase();
|
||||
Edge edge = constructEdge(name, type2);
|
||||
edge = doPost("/api/edge", edge, Edge.class);
|
||||
edgesType2.add(doPost("/api/customer/" + customerId.getId().toString()
|
||||
+ "/edge/" + edge.getId().getId().toString(), Edge.class));
|
||||
futures.add(executor.submit(() -> {
|
||||
Edge edge1 = doPost("/api/edge", edge, Edge.class);
|
||||
return doPost("/api/customer/" + customerId.getId().toString()
|
||||
+ "/edge/" + edge1.getId().getId().toString(), Edge.class);
|
||||
}));
|
||||
}
|
||||
List<Edge> edgesType2 = new ArrayList<>(Futures.allAsList(futures).get(TIMEOUT, TimeUnit.SECONDS));
|
||||
|
||||
List<Edge> loadedEdgesType1 = new ArrayList<>();
|
||||
PageLink pageLink = new PageLink(15, 0, title1);
|
||||
|
||||
@ -501,7 +501,6 @@ abstract public class BaseDeviceEdgeTest extends AbstractEdgeTest {
|
||||
Assert.assertTrue(deviceUpdateMsgOpt.isPresent());
|
||||
DeviceUpdateMsg latestDeviceUpdateMsg = deviceUpdateMsgOpt.get();
|
||||
Assert.assertNotEquals(deviceOnCloudName, latestDeviceUpdateMsg.getName());
|
||||
Assert.assertEquals(deviceOnCloudName, latestDeviceUpdateMsg.getConflictName());
|
||||
|
||||
UUID newDeviceId = new UUID(latestDeviceUpdateMsg.getIdMSB(), latestDeviceUpdateMsg.getIdLSB());
|
||||
|
||||
|
||||
@ -34,5 +34,5 @@ public enum EdgeEventActionType {
|
||||
ASSIGNED_TO_EDGE,
|
||||
UNASSIGNED_FROM_EDGE,
|
||||
CREDENTIALS_REQUEST,
|
||||
ENTITY_MERGE_REQUEST
|
||||
ENTITY_MERGE_REQUEST // deprecated
|
||||
}
|
||||
@ -111,7 +111,7 @@ enum UpdateMsgType {
|
||||
ENTITY_DELETED_RPC_MESSAGE = 2;
|
||||
ALARM_ACK_RPC_MESSAGE = 3;
|
||||
ALARM_CLEAR_RPC_MESSAGE = 4;
|
||||
ENTITY_MERGE_RPC_MESSAGE = 5;
|
||||
ENTITY_MERGE_RPC_MESSAGE = 5; // deprecated
|
||||
}
|
||||
|
||||
message EntityDataProto {
|
||||
@ -199,7 +199,7 @@ message DeviceUpdateMsg {
|
||||
string type = 9;
|
||||
optional string label = 10;
|
||||
optional string additionalInfo = 11;
|
||||
optional string conflictName = 12;
|
||||
optional string conflictName = 12; // deprecated
|
||||
optional int64 firmwareIdMSB = 13;
|
||||
optional int64 firmwareIdLSB = 14;
|
||||
optional bytes deviceDataBytes = 15;
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user