Refactoring: process correct log warning, remove extra locks to process msg from edge

This commit is contained in:
Andrii Landiak 2023-08-16 09:24:00 +03:00
parent 308674ddc7
commit 5ee110b47e
12 changed files with 80 additions and 101 deletions

View File

@ -674,9 +674,9 @@ public final class EdgeGrpcSession implements Closeable {
result.add(ctx.getDeviceProcessor().processDeviceCredentialsMsg(edge.getTenantId(), deviceCredentialsUpdateMsg)); result.add(ctx.getDeviceProcessor().processDeviceCredentialsMsg(edge.getTenantId(), deviceCredentialsUpdateMsg));
} }
} }
if (uplinkMsg.getDeviceProfileUpdateMsgCount() > 0) { if (uplinkMsg.getAssetProfileUpdateMsgCount() > 0) {
for (DeviceProfileUpdateMsg deviceProfileUpdateMsg : uplinkMsg.getDeviceProfileUpdateMsgList()) { for (AssetProfileUpdateMsg assetProfileUpdateMsg : uplinkMsg.getAssetProfileUpdateMsgList()) {
result.add(ctx.getDeviceProfileProcessor().processDeviceProfileMsgFromEdge(edge.getTenantId(), edge, deviceProfileUpdateMsg)); result.add(ctx.getAssetProfileProcessor().processAssetProfileMsgFromEdge(edge.getTenantId(), edge, assetProfileUpdateMsg));
} }
} }
if (uplinkMsg.getAssetUpdateMsgCount() > 0) { if (uplinkMsg.getAssetUpdateMsgCount() > 0) {
@ -684,16 +684,16 @@ public final class EdgeGrpcSession implements Closeable {
result.add(ctx.getAssetProcessor().processAssetMsgFromEdge(edge.getTenantId(), edge, assetUpdateMsg)); result.add(ctx.getAssetProcessor().processAssetMsgFromEdge(edge.getTenantId(), edge, assetUpdateMsg));
} }
} }
if (uplinkMsg.getAssetProfileUpdateMsgCount() > 0) {
for (AssetProfileUpdateMsg assetProfileUpdateMsg : uplinkMsg.getAssetProfileUpdateMsgList()) {
result.add(ctx.getAssetProfileProcessor().processAssetProfileMsgFromEdge(edge.getTenantId(), edge, assetProfileUpdateMsg));
}
}
if (uplinkMsg.getAlarmUpdateMsgCount() > 0) { if (uplinkMsg.getAlarmUpdateMsgCount() > 0) {
for (AlarmUpdateMsg alarmUpdateMsg : uplinkMsg.getAlarmUpdateMsgList()) { for (AlarmUpdateMsg alarmUpdateMsg : uplinkMsg.getAlarmUpdateMsgList()) {
result.add(ctx.getAlarmProcessor().processAlarmMsg(edge.getTenantId(), alarmUpdateMsg)); result.add(ctx.getAlarmProcessor().processAlarmMsg(edge.getTenantId(), alarmUpdateMsg));
} }
} }
if (uplinkMsg.getDeviceProfileUpdateMsgCount() > 0) {
for (DeviceProfileUpdateMsg deviceProfileUpdateMsg : uplinkMsg.getDeviceProfileUpdateMsgList()) {
result.add(ctx.getDeviceProfileProcessor().processDeviceProfileMsgFromEdge(edge.getTenantId(), edge, deviceProfileUpdateMsg));
}
}
if (uplinkMsg.getRelationUpdateMsgCount() > 0) { if (uplinkMsg.getRelationUpdateMsgCount() > 0) {
for (RelationUpdateMsg relationUpdateMsg : uplinkMsg.getRelationUpdateMsgList()) { for (RelationUpdateMsg relationUpdateMsg : uplinkMsg.getRelationUpdateMsgList()) {
result.add(ctx.getRelationProcessor().processRelationMsg(edge.getTenantId(), relationUpdateMsg)); result.add(ctx.getRelationProcessor().processRelationMsg(edge.getTenantId(), relationUpdateMsg));

View File

@ -114,11 +114,7 @@ import java.util.concurrent.locks.ReentrantLock;
public abstract class BaseEdgeProcessor { public abstract class BaseEdgeProcessor {
protected static final Lock deviceCreationLock = new ReentrantLock(); protected static final Lock deviceCreationLock = new ReentrantLock();
protected static final Lock deviceProfileCreationLock = new ReentrantLock();
protected static final Lock assetCreationLock = new ReentrantLock(); protected static final Lock assetCreationLock = new ReentrantLock();
protected static final Lock assetProfileCreationLock = new ReentrantLock();
protected static final Lock dashboardCreationLock = new ReentrantLock();
protected static final Lock entityViewCreationLock = new ReentrantLock();
protected static final int DEFAULT_PAGE_SIZE = 100; protected static final int DEFAULT_PAGE_SIZE = 100;

View File

@ -115,7 +115,7 @@ public class AssetEdgeProcessor extends BaseAssetProcessor {
@Override @Override
public void onFailure(Throwable t) { public void onFailure(Throwable t) {
log.debug("Failed to send ENTITY_CREATED EVENT to rule engine [{}]", asset, t); log.warn("Failed to send ENTITY_CREATED EVENT to rule engine [{}]", asset, t);
} }
}); });
} catch (JsonProcessingException | IllegalArgumentException e) { } catch (JsonProcessingException | IllegalArgumentException e) {

View File

@ -64,12 +64,8 @@ public class AssetProfileEdgeProcessor extends BaseAssetProfileProcessor {
return handleUnsupportedMsgType(assetProfileUpdateMsg.getMsgType()); return handleUnsupportedMsgType(assetProfileUpdateMsg.getMsgType());
} }
} catch (DataValidationException e) { } catch (DataValidationException e) {
if (e.getMessage().contains("limit reached")) { log.warn("Failed to process AssetProfileUpdateMsg from Edge [{}]", assetProfileUpdateMsg, e);
log.warn("[{}] Number of allowed asset profile violated {}", tenantId, assetProfileUpdateMsg, e); return Futures.immediateFailedFuture(e);
return Futures.immediateFuture(null);
} else {
return Futures.immediateFailedFuture(e);
}
} finally { } finally {
edgeSynchronizationManager.getSync().remove(); edgeSynchronizationManager.getSync().remove();
} }
@ -97,7 +93,7 @@ public class AssetProfileEdgeProcessor extends BaseAssetProfileProcessor {
@Override @Override
public void onFailure(Throwable t) { public void onFailure(Throwable t) {
log.debug("Failed to send ENTITY_CREATED EVENT to rule engine [{}]", assetProfile, t); log.warn("Failed to send ENTITY_CREATED EVENT to rule engine [{}]", assetProfile, t);
} }
}); });
} catch (JsonProcessingException | IllegalArgumentException e) { } catch (JsonProcessingException | IllegalArgumentException e) {

View File

@ -33,7 +33,7 @@ public class BaseAssetProfileProcessor extends BaseEdgeProcessor {
protected boolean saveOrUpdateAssetProfile(TenantId tenantId, AssetProfileId assetProfileId, AssetProfileUpdateMsg assetProfileUpdateMsg) { protected boolean saveOrUpdateAssetProfile(TenantId tenantId, AssetProfileId assetProfileId, AssetProfileUpdateMsg assetProfileUpdateMsg) {
boolean created = false; boolean created = false;
assetProfileCreationLock.lock(); assetCreationLock.lock();
try { try {
AssetProfile assetProfile = assetProfileService.findAssetProfileById(tenantId, assetProfileId); AssetProfile assetProfile = assetProfileService.findAssetProfileById(tenantId, assetProfileId);
String assetProfileName = assetProfileUpdateMsg.getName(); String assetProfileName = assetProfileUpdateMsg.getName();
@ -62,7 +62,7 @@ public class BaseAssetProfileProcessor extends BaseEdgeProcessor {
} }
assetProfileService.saveAssetProfile(assetProfile, false); assetProfileService.saveAssetProfile(assetProfile, false);
} finally { } finally {
assetProfileCreationLock.unlock(); assetCreationLock.unlock();
} }
return created; return created;
} }

View File

@ -34,39 +34,35 @@ public abstract class BaseDashboardProcessor extends BaseEdgeProcessor {
protected boolean saveOrUpdateDashboard(TenantId tenantId, DashboardId dashboardId, DashboardUpdateMsg dashboardUpdateMsg, CustomerId customerId) { protected boolean saveOrUpdateDashboard(TenantId tenantId, DashboardId dashboardId, DashboardUpdateMsg dashboardUpdateMsg, CustomerId customerId) {
boolean created = false; boolean created = false;
dashboardCreationLock.lock(); Dashboard dashboard = dashboardService.findDashboardById(tenantId, dashboardId);
try { if (dashboard == null) {
Dashboard dashboard = dashboardService.findDashboardById(tenantId, dashboardId); created = true;
if (dashboard == null) { dashboard = new Dashboard();
created = true; dashboard.setTenantId(tenantId);
dashboard = new Dashboard(); dashboard.setCreatedTime(Uuids.unixTimestamp(dashboardId.getId()));
dashboard.setTenantId(tenantId); }
dashboard.setCreatedTime(Uuids.unixTimestamp(dashboardId.getId())); dashboard.setTitle(dashboardUpdateMsg.getTitle());
} dashboard.setConfiguration(JacksonUtil.toJsonNode(dashboardUpdateMsg.getConfiguration()));
dashboard.setTitle(dashboardUpdateMsg.getTitle()); Set<ShortCustomerInfo> assignedCustomers = null;
dashboard.setConfiguration(JacksonUtil.toJsonNode(dashboardUpdateMsg.getConfiguration())); if (dashboardUpdateMsg.hasAssignedCustomers()) {
Set<ShortCustomerInfo> assignedCustomers = null; assignedCustomers = JacksonUtil.fromString(dashboardUpdateMsg.getAssignedCustomers(), new TypeReference<>() {
if (dashboardUpdateMsg.hasAssignedCustomers()) { });
assignedCustomers = JacksonUtil.fromString(dashboardUpdateMsg.getAssignedCustomers(), new TypeReference<>() {}); dashboard.setAssignedCustomers(assignedCustomers);
dashboard.setAssignedCustomers(assignedCustomers); }
}
dashboardValidator.validate(dashboard, Dashboard::getTenantId); dashboardValidator.validate(dashboard, Dashboard::getTenantId);
if (created) { if (created) {
dashboard.setId(dashboardId); dashboard.setId(dashboardId);
} }
Dashboard savedDashboard = dashboardService.saveDashboard(dashboard, false); Dashboard savedDashboard = dashboardService.saveDashboard(dashboard, false);
if (assignedCustomers != null && !assignedCustomers.isEmpty()) { if (assignedCustomers != null && !assignedCustomers.isEmpty()) {
for (ShortCustomerInfo assignedCustomer : assignedCustomers) { for (ShortCustomerInfo assignedCustomer : assignedCustomers) {
if (assignedCustomer.getCustomerId().equals(customerId)) { if (assignedCustomer.getCustomerId().equals(customerId)) {
dashboardService.assignDashboardToCustomer(tenantId, dashboardId, assignedCustomer.getCustomerId()); dashboardService.assignDashboardToCustomer(tenantId, dashboardId, assignedCustomer.getCustomerId());
}
} }
} else {
unassignCustomersFromDashboard(tenantId, savedDashboard);
} }
} finally { } else {
dashboardCreationLock.unlock(); unassignCustomersFromDashboard(tenantId, savedDashboard);
} }
return created; return created;
} }

View File

@ -105,7 +105,7 @@ public class DashboardEdgeProcessor extends BaseDashboardProcessor {
@Override @Override
public void onFailure(Throwable t) { public void onFailure(Throwable t) {
log.debug("Failed to send ENTITY_CREATED EVENT to rule engine [{}]", dashboard, t); log.warn("Failed to send ENTITY_CREATED EVENT to rule engine [{}]", dashboard, t);
} }
}); });
} catch (JsonProcessingException | IllegalArgumentException e) { } catch (JsonProcessingException | IllegalArgumentException e) {

View File

@ -45,7 +45,7 @@ public class BaseDeviceProfileProcessor extends BaseEdgeProcessor {
protected boolean saveOrUpdateDeviceProfile(TenantId tenantId, DeviceProfileId deviceProfileId, DeviceProfileUpdateMsg deviceProfileUpdateMsg) { protected boolean saveOrUpdateDeviceProfile(TenantId tenantId, DeviceProfileId deviceProfileId, DeviceProfileUpdateMsg deviceProfileUpdateMsg) {
boolean created = false; boolean created = false;
deviceProfileCreationLock.lock(); deviceCreationLock.lock();
try { try {
DeviceProfile deviceProfile = deviceProfileService.findDeviceProfileById(tenantId, deviceProfileId); DeviceProfile deviceProfile = deviceProfileService.findDeviceProfileById(tenantId, deviceProfileId);
if (deviceProfile == null) { if (deviceProfile == null) {
@ -95,7 +95,7 @@ public class BaseDeviceProfileProcessor extends BaseEdgeProcessor {
} }
deviceProfileService.saveDeviceProfile(deviceProfile, false); deviceProfileService.saveDeviceProfile(deviceProfile, false);
} finally { } finally {
deviceProfileCreationLock.unlock(); deviceCreationLock.unlock();
} }
return created; return created;
} }

View File

@ -124,7 +124,7 @@ public class DeviceEdgeProcessor extends BaseDeviceProcessor {
@Override @Override
public void onFailure(Throwable t) { public void onFailure(Throwable t) {
log.debug("Failed to send ENTITY_CREATED EVENT to rule engine [{}]", device, t); log.warn("Failed to send ENTITY_CREATED EVENT to rule engine [{}]", device, t);
} }
}); });
} catch (JsonProcessingException | IllegalArgumentException e) { } catch (JsonProcessingException | IllegalArgumentException e) {

View File

@ -65,12 +65,8 @@ public class DeviceProfileEdgeProcessor extends BaseDeviceProfileProcessor {
return handleUnsupportedMsgType(deviceProfileUpdateMsg.getMsgType()); return handleUnsupportedMsgType(deviceProfileUpdateMsg.getMsgType());
} }
} catch (DataValidationException e) { } catch (DataValidationException e) {
if (e.getMessage().contains("limit reached")) { log.warn("Failed to process DeviceProfileUpdateMsg from Edge [{}]", deviceProfileUpdateMsg, e);
log.warn("[{}] Number of allowed device profile violated {}", tenantId, deviceProfileUpdateMsg, e); return Futures.immediateFailedFuture(e);
return Futures.immediateFuture(null);
} else {
return Futures.immediateFailedFuture(e);
}
} finally { } finally {
edgeSynchronizationManager.getSync().remove(); edgeSynchronizationManager.getSync().remove();
} }
@ -98,7 +94,7 @@ public class DeviceProfileEdgeProcessor extends BaseDeviceProfileProcessor {
@Override @Override
public void onFailure(Throwable t) { public void onFailure(Throwable t) {
log.debug("Failed to send ENTITY_CREATED EVENT to rule engine [{}]", deviceProfile, t); log.warn("Failed to send ENTITY_CREATED EVENT to rule engine [{}]", deviceProfile, t);
} }
}); });
} catch (JsonProcessingException | IllegalArgumentException e) { } catch (JsonProcessingException | IllegalArgumentException e) {

View File

@ -38,44 +38,39 @@ public abstract class BaseEntityViewProcessor extends BaseEdgeProcessor {
protected Pair<Boolean, Boolean> saveOrUpdateEntityView(TenantId tenantId, EntityViewId entityViewId, EntityViewUpdateMsg entityViewUpdateMsg, CustomerId customerId) { protected Pair<Boolean, Boolean> saveOrUpdateEntityView(TenantId tenantId, EntityViewId entityViewId, EntityViewUpdateMsg entityViewUpdateMsg, CustomerId customerId) {
boolean created = false; boolean created = false;
boolean entityViewNameUpdated = false; boolean entityViewNameUpdated = false;
entityViewCreationLock.lock(); EntityView entityView = entityViewService.findEntityViewById(tenantId, entityViewId);
try { String entityViewName = entityViewUpdateMsg.getName();
EntityView entityView = entityViewService.findEntityViewById(tenantId, entityViewId); if (entityView == null) {
String entityViewName = entityViewUpdateMsg.getName(); created = true;
if (entityView == null) { entityView = new EntityView();
created = true; entityView.setTenantId(tenantId);
entityView = new EntityView(); entityView.setCreatedTime(Uuids.unixTimestamp(entityViewId.getId()));
entityView.setTenantId(tenantId);
entityView.setCreatedTime(Uuids.unixTimestamp(entityViewId.getId()));
}
EntityView entityViewByName = entityViewService.findEntityViewByTenantIdAndName(tenantId, entityViewName);
if (entityViewByName != null && !entityViewByName.getId().equals(entityViewId)) {
entityViewName = entityViewName + "_" + StringUtils.randomAlphanumeric(15);
log.warn("Entity view with name {} already exists. Renaming entity view name to {}",
entityViewUpdateMsg.getName(), entityViewName);
entityViewNameUpdated = true;
}
entityView.setName(entityViewName);
entityView.setType(entityViewUpdateMsg.getType());
entityView.setCustomerId(customerId);
entityView.setAdditionalInfo(entityViewUpdateMsg.hasAdditionalInfo() ?
JacksonUtil.toJsonNode(entityViewUpdateMsg.getAdditionalInfo()) : null);
UUID entityIdUUID = safeGetUUID(entityViewUpdateMsg.getEntityIdMSB(), entityViewUpdateMsg.getEntityIdLSB());
if (EdgeEntityType.DEVICE.equals(entityViewUpdateMsg.getEntityType())) {
entityView.setEntityId(entityIdUUID != null ? new DeviceId(entityIdUUID) : null);
} else if (EdgeEntityType.ASSET.equals(entityViewUpdateMsg.getEntityType())) {
entityView.setEntityId(entityIdUUID != null ? new AssetId(entityIdUUID) : null);
}
entityViewValidator.validate(entityView, EntityView::getTenantId);
if (created) {
entityView.setId(entityViewId);
}
entityViewService.saveEntityView(entityView, false);
} finally {
entityViewCreationLock.unlock();
} }
EntityView entityViewByName = entityViewService.findEntityViewByTenantIdAndName(tenantId, entityViewName);
if (entityViewByName != null && !entityViewByName.getId().equals(entityViewId)) {
entityViewName = entityViewName + "_" + StringUtils.randomAlphanumeric(15);
log.warn("Entity view with name {} already exists. Renaming entity view name to {}",
entityViewUpdateMsg.getName(), entityViewName);
entityViewNameUpdated = true;
}
entityView.setName(entityViewName);
entityView.setType(entityViewUpdateMsg.getType());
entityView.setCustomerId(customerId);
entityView.setAdditionalInfo(entityViewUpdateMsg.hasAdditionalInfo() ?
JacksonUtil.toJsonNode(entityViewUpdateMsg.getAdditionalInfo()) : null);
UUID entityIdUUID = safeGetUUID(entityViewUpdateMsg.getEntityIdMSB(), entityViewUpdateMsg.getEntityIdLSB());
if (EdgeEntityType.DEVICE.equals(entityViewUpdateMsg.getEntityType())) {
entityView.setEntityId(entityIdUUID != null ? new DeviceId(entityIdUUID) : null);
} else if (EdgeEntityType.ASSET.equals(entityViewUpdateMsg.getEntityType())) {
entityView.setEntityId(entityIdUUID != null ? new AssetId(entityIdUUID) : null);
}
entityViewValidator.validate(entityView, EntityView::getTenantId);
if (created) {
entityView.setId(entityViewId);
}
entityViewService.saveEntityView(entityView, false);
return Pair.of(created, entityViewNameUpdated); return Pair.of(created, entityViewNameUpdated);
} }
} }

View File

@ -113,7 +113,7 @@ public class EntityViewEdgeProcessor extends BaseEntityViewProcessor {
@Override @Override
public void onFailure(Throwable t) { public void onFailure(Throwable t) {
log.debug("Failed to send ENTITY_CREATED EVENT to rule engine [{}]", entityView, t); log.warn("Failed to send ENTITY_CREATED EVENT to rule engine [{}]", entityView, t);
} }
}); });
} catch (JsonProcessingException | IllegalArgumentException e) { } catch (JsonProcessingException | IllegalArgumentException e) {