Make save edge event method async

This commit is contained in:
Volodymyr Babak 2022-04-20 17:17:46 +03:00
parent 00de756e03
commit f8687cb983
13 changed files with 367 additions and 167 deletions

View File

@ -35,6 +35,7 @@ import org.thingsboard.server.actors.TbActorCtx;
import org.thingsboard.server.actors.shared.AbstractContextAwareMsgProcessor; import org.thingsboard.server.actors.shared.AbstractContextAwareMsgProcessor;
import org.thingsboard.server.common.data.DataConstants; import org.thingsboard.server.common.data.DataConstants;
import org.thingsboard.server.common.data.Device; import org.thingsboard.server.common.data.Device;
import org.thingsboard.server.common.data.EdgeUtils;
import org.thingsboard.server.common.data.StringUtils; import org.thingsboard.server.common.data.StringUtils;
import org.thingsboard.server.common.data.edge.EdgeEvent; import org.thingsboard.server.common.data.edge.EdgeEvent;
import org.thingsboard.server.common.data.edge.EdgeEventActionType; import org.thingsboard.server.common.data.edge.EdgeEventActionType;
@ -97,7 +98,6 @@ import javax.annotation.Nullable;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collections; import java.util.Collections;
import java.util.ConcurrentModificationException;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
import java.util.LinkedHashMap; import java.util.LinkedHashMap;
@ -811,12 +811,6 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
} }
private void saveRpcRequestToEdgeQueue(ToDeviceRpcRequest msg, Integer requestId) { private void saveRpcRequestToEdgeQueue(ToDeviceRpcRequest msg, Integer requestId) {
EdgeEvent edgeEvent = new EdgeEvent();
edgeEvent.setTenantId(tenantId);
edgeEvent.setAction(EdgeEventActionType.RPC_CALL);
edgeEvent.setEntityId(deviceId.getId());
edgeEvent.setType(EdgeEventType.DEVICE);
ObjectNode body = mapper.createObjectNode(); ObjectNode body = mapper.createObjectNode();
body.put("requestId", requestId); body.put("requestId", requestId);
body.put("requestUUID", msg.getId().toString()); body.put("requestUUID", msg.getId().toString());
@ -824,11 +818,21 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
body.put("expirationTime", msg.getExpirationTime()); body.put("expirationTime", msg.getExpirationTime());
body.put("method", msg.getBody().getMethod()); body.put("method", msg.getBody().getMethod());
body.put("params", msg.getBody().getParams()); body.put("params", msg.getBody().getParams());
edgeEvent.setBody(body);
edgeEvent.setEdgeId(edgeId); EdgeEvent edgeEvent = EdgeUtils.constructEdgeEvent(tenantId, edgeId, EdgeEventType.DEVICE, EdgeEventActionType.RPC_CALL, deviceId, body);
systemContext.getEdgeEventService().save(edgeEvent);
systemContext.getClusterService().onEdgeEventUpdate(tenantId, edgeId); Futures.addCallback(systemContext.getEdgeEventService().saveAsync(edgeEvent), new FutureCallback<>() {
@Override
public void onSuccess(Void unused) {
systemContext.getClusterService().onEdgeEventUpdate(tenantId, edgeId);
}
@Override
public void onFailure(Throwable t) {
String errMsg = String.format("Failed to save edge event. msg [%s], edge event [%s]", msg, edgeEvent);
log.warn(errMsg, t);
}
}, systemContext.getDbCallbackExecutor());
} }
private List<TsKvProto> toTsKvProtos(@Nullable List<AttributeKvEntry> result) { private List<TsKvProto> toTsKvProtos(@Nullable List<AttributeKvEntry> result) {

View File

@ -16,11 +16,15 @@
package org.thingsboard.server.service.edge; package org.thingsboard.server.service.edge;
import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import org.thingsboard.common.util.ThingsBoardThreadFactory; import org.thingsboard.common.util.ThingsBoardThreadFactory;
import org.thingsboard.server.cluster.TbClusterService; import org.thingsboard.server.cluster.TbClusterService;
import org.thingsboard.server.common.data.EdgeUtils;
import org.thingsboard.server.common.data.edge.Edge; import org.thingsboard.server.common.data.edge.Edge;
import org.thingsboard.server.common.data.edge.EdgeEvent; import org.thingsboard.server.common.data.edge.EdgeEvent;
import org.thingsboard.server.common.data.edge.EdgeEventActionType; import org.thingsboard.server.common.data.edge.EdgeEventActionType;
@ -76,17 +80,17 @@ public class DefaultEdgeNotificationService implements EdgeNotificationService {
@Autowired @Autowired
private CustomerEdgeProcessor customerProcessor; private CustomerEdgeProcessor customerProcessor;
private ExecutorService tsCallBackExecutor; private ExecutorService dbCallBackExecutor;
@PostConstruct @PostConstruct
public void initExecutor() { public void initExecutor() {
tsCallBackExecutor = Executors.newSingleThreadExecutor(ThingsBoardThreadFactory.forName("edge-notifications")); dbCallBackExecutor = Executors.newSingleThreadExecutor(ThingsBoardThreadFactory.forName("edge-notifications"));
} }
@PreDestroy @PreDestroy
public void shutdownExecutor() { public void shutdownExecutor() {
if (tsCallBackExecutor != null) { if (dbCallBackExecutor != null) {
tsCallBackExecutor.shutdownNow(); dbCallBackExecutor.shutdownNow();
} }
} }
@ -107,17 +111,20 @@ public class DefaultEdgeNotificationService implements EdgeNotificationService {
log.debug("Pushing edge event to edge queue. tenantId [{}], edgeId [{}], type [{}], action[{}], entityId [{}], body [{}]", log.debug("Pushing edge event to edge queue. tenantId [{}], edgeId [{}], type [{}], action[{}], entityId [{}], body [{}]",
tenantId, edgeId, type, action, entityId, body); tenantId, edgeId, type, action, entityId, body);
EdgeEvent edgeEvent = new EdgeEvent(); EdgeEvent edgeEvent = EdgeUtils.constructEdgeEvent(tenantId, edgeId, type, action, entityId, body);
edgeEvent.setEdgeId(edgeId);
edgeEvent.setTenantId(tenantId); Futures.addCallback(edgeEventService.saveAsync(edgeEvent), new FutureCallback<>() {
edgeEvent.setType(type); @Override
edgeEvent.setAction(action); public void onSuccess(@Nullable Void unused) {
if (entityId != null) { clusterService.onEdgeEventUpdate(tenantId, edgeId);
edgeEvent.setEntityId(entityId.getId()); }
}
edgeEvent.setBody(body); @Override
edgeEventService.save(edgeEvent); public void onFailure(Throwable t) {
clusterService.onEdgeEventUpdate(tenantId, edgeId); String errMsg = String.format("Failed to save edge event. edge event [%s]", edgeEvent);
log.warn(errMsg, t);
}
}, dbCallBackExecutor);
} }
@Override @Override

View File

@ -17,10 +17,14 @@ package org.thingsboard.server.service.edge.rpc.processor;
import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.thingsboard.server.cluster.TbClusterService; import org.thingsboard.server.cluster.TbClusterService;
import org.thingsboard.server.common.data.Device; import org.thingsboard.server.common.data.Device;
import org.thingsboard.server.common.data.EdgeUtils;
import org.thingsboard.server.common.data.HasCustomerId; import org.thingsboard.server.common.data.HasCustomerId;
import org.thingsboard.server.common.data.edge.Edge; import org.thingsboard.server.common.data.edge.Edge;
import org.thingsboard.server.common.data.edge.EdgeEvent; import org.thingsboard.server.common.data.edge.EdgeEvent;
@ -189,17 +193,20 @@ public abstract class BaseEdgeProcessor {
"action [{}], entityId [{}], body [{}]", "action [{}], entityId [{}], body [{}]",
tenantId, edgeId, type, action, entityId, body); tenantId, edgeId, type, action, entityId, body);
EdgeEvent edgeEvent = new EdgeEvent(); EdgeEvent edgeEvent = EdgeUtils.constructEdgeEvent(tenantId, edgeId, type, action, entityId, body);
edgeEvent.setTenantId(tenantId);
edgeEvent.setEdgeId(edgeId); Futures.addCallback(edgeEventService.saveAsync(edgeEvent), new FutureCallback<>() {
edgeEvent.setType(type); @Override
edgeEvent.setAction(action); public void onSuccess(@Nullable Void unused) {
if (entityId != null) { tbClusterService.onEdgeEventUpdate(tenantId, edgeId);
edgeEvent.setEntityId(entityId.getId()); }
}
edgeEvent.setBody(body); @Override
edgeEventService.save(edgeEvent); public void onFailure(Throwable t) {
tbClusterService.onEdgeEventUpdate(tenantId, edgeId); String errMsg = String.format("Failed to save edge event. edge event [%s]", edgeEvent);
log.warn(errMsg, t);
}
}, dbCallbackExecutorService);
} }
protected CustomerId getCustomerIdIfEdgeAssignedToCustomer(HasCustomerId hasCustomerIdEntity, Edge edge) { protected CustomerId getCustomerIdIfEdgeAssignedToCustomer(HasCustomerId hasCustomerIdEntity, Edge edge) {

View File

@ -405,8 +405,18 @@ public class DefaultEdgeRequestsService implements EdgeRequestsService {
EdgeEvent edgeEvent = EdgeUtils.constructEdgeEvent(tenantId, edgeId, type, action, entityId, body); EdgeEvent edgeEvent = EdgeUtils.constructEdgeEvent(tenantId, edgeId, type, action, entityId, body);
edgeEventService.save(edgeEvent); Futures.addCallback(edgeEventService.saveAsync(edgeEvent), new FutureCallback<>() {
tbClusterService.onEdgeEventUpdate(tenantId, edgeId); @Override
public void onSuccess(@Nullable Void unused) {
tbClusterService.onEdgeEventUpdate(tenantId, edgeId);
}
@Override
public void onFailure(Throwable t) {
String errMsg = String.format("Failed to save edge event. edge event [%s]", edgeEvent);
log.warn(errMsg, t);
}
}, dbCallbackExecutorService);
} }
} }

View File

@ -976,7 +976,7 @@ abstract public class BaseEdgeTest extends AbstractControllerTest {
String timeseriesData = "{\"data\":{\"temperature\":25},\"ts\":" + System.currentTimeMillis() + "}"; String timeseriesData = "{\"data\":{\"temperature\":25},\"ts\":" + System.currentTimeMillis() + "}";
JsonNode timeseriesEntityData = mapper.readTree(timeseriesData); JsonNode timeseriesEntityData = mapper.readTree(timeseriesData);
EdgeEvent edgeEvent = constructEdgeEvent(tenantId, edge.getId(), EdgeEventActionType.TIMESERIES_UPDATED, device.getId().getId(), EdgeEventType.DEVICE, timeseriesEntityData); EdgeEvent edgeEvent = constructEdgeEvent(tenantId, edge.getId(), EdgeEventActionType.TIMESERIES_UPDATED, device.getId().getId(), EdgeEventType.DEVICE, timeseriesEntityData);
edgeEventService.save(edgeEvent); edgeEventService.saveAsync(edgeEvent).get();
clusterService.onEdgeEventUpdate(tenantId, edge.getId()); clusterService.onEdgeEventUpdate(tenantId, edge.getId());
Assert.assertTrue(edgeImitator.waitForMessages()); Assert.assertTrue(edgeImitator.waitForMessages());
@ -1007,12 +1007,12 @@ abstract public class BaseEdgeTest extends AbstractControllerTest {
testAttributesDeleteMsg(device); testAttributesDeleteMsg(device);
} }
private void testAttributesUpdatedMsg(Device device) throws JsonProcessingException, InterruptedException { private void testAttributesUpdatedMsg(Device device) throws Exception {
String attributesData = "{\"scope\":\"SERVER_SCOPE\",\"kv\":{\"key1\":\"value1\"}}"; String attributesData = "{\"scope\":\"SERVER_SCOPE\",\"kv\":{\"key1\":\"value1\"}}";
JsonNode attributesEntityData = mapper.readTree(attributesData); JsonNode attributesEntityData = mapper.readTree(attributesData);
EdgeEvent edgeEvent1 = constructEdgeEvent(tenantId, edge.getId(), EdgeEventActionType.ATTRIBUTES_UPDATED, device.getId().getId(), EdgeEventType.DEVICE, attributesEntityData); EdgeEvent edgeEvent1 = constructEdgeEvent(tenantId, edge.getId(), EdgeEventActionType.ATTRIBUTES_UPDATED, device.getId().getId(), EdgeEventType.DEVICE, attributesEntityData);
edgeImitator.expectMessageAmount(1); edgeImitator.expectMessageAmount(1);
edgeEventService.save(edgeEvent1); edgeEventService.saveAsync(edgeEvent1).get();
clusterService.onEdgeEventUpdate(tenantId, edge.getId()); clusterService.onEdgeEventUpdate(tenantId, edge.getId());
Assert.assertTrue(edgeImitator.waitForMessages()); Assert.assertTrue(edgeImitator.waitForMessages());
@ -1032,12 +1032,12 @@ abstract public class BaseEdgeTest extends AbstractControllerTest {
Assert.assertEquals("value1", keyValueProto.getStringV()); Assert.assertEquals("value1", keyValueProto.getStringV());
} }
private void testPostAttributesMsg(Device device) throws JsonProcessingException, InterruptedException { private void testPostAttributesMsg(Device device) throws Exception {
String postAttributesData = "{\"scope\":\"SERVER_SCOPE\",\"kv\":{\"key2\":\"value2\"}}"; String postAttributesData = "{\"scope\":\"SERVER_SCOPE\",\"kv\":{\"key2\":\"value2\"}}";
JsonNode postAttributesEntityData = mapper.readTree(postAttributesData); JsonNode postAttributesEntityData = mapper.readTree(postAttributesData);
EdgeEvent edgeEvent = constructEdgeEvent(tenantId, edge.getId(), EdgeEventActionType.POST_ATTRIBUTES, device.getId().getId(), EdgeEventType.DEVICE, postAttributesEntityData); EdgeEvent edgeEvent = constructEdgeEvent(tenantId, edge.getId(), EdgeEventActionType.POST_ATTRIBUTES, device.getId().getId(), EdgeEventType.DEVICE, postAttributesEntityData);
edgeImitator.expectMessageAmount(1); edgeImitator.expectMessageAmount(1);
edgeEventService.save(edgeEvent); edgeEventService.saveAsync(edgeEvent).get();
clusterService.onEdgeEventUpdate(tenantId, edge.getId()); clusterService.onEdgeEventUpdate(tenantId, edge.getId());
Assert.assertTrue(edgeImitator.waitForMessages()); Assert.assertTrue(edgeImitator.waitForMessages());
@ -1057,12 +1057,12 @@ abstract public class BaseEdgeTest extends AbstractControllerTest {
Assert.assertEquals("value2", keyValueProto.getStringV()); Assert.assertEquals("value2", keyValueProto.getStringV());
} }
private void testAttributesDeleteMsg(Device device) throws JsonProcessingException, InterruptedException { private void testAttributesDeleteMsg(Device device) throws Exception {
String deleteAttributesData = "{\"scope\":\"SERVER_SCOPE\",\"keys\":[\"key1\",\"key2\"]}"; String deleteAttributesData = "{\"scope\":\"SERVER_SCOPE\",\"keys\":[\"key1\",\"key2\"]}";
JsonNode deleteAttributesEntityData = mapper.readTree(deleteAttributesData); JsonNode deleteAttributesEntityData = mapper.readTree(deleteAttributesData);
EdgeEvent edgeEvent = constructEdgeEvent(tenantId, edge.getId(), EdgeEventActionType.ATTRIBUTES_DELETED, device.getId().getId(), EdgeEventType.DEVICE, deleteAttributesEntityData); EdgeEvent edgeEvent = constructEdgeEvent(tenantId, edge.getId(), EdgeEventActionType.ATTRIBUTES_DELETED, device.getId().getId(), EdgeEventType.DEVICE, deleteAttributesEntityData);
edgeImitator.expectMessageAmount(1); edgeImitator.expectMessageAmount(1);
edgeEventService.save(edgeEvent); edgeEventService.saveAsync(edgeEvent).get();
clusterService.onEdgeEventUpdate(tenantId, edge.getId()); clusterService.onEdgeEventUpdate(tenantId, edge.getId());
Assert.assertTrue(edgeImitator.waitForMessages()); Assert.assertTrue(edgeImitator.waitForMessages());
@ -1097,7 +1097,7 @@ abstract public class BaseEdgeTest extends AbstractControllerTest {
EdgeEvent edgeEvent = constructEdgeEvent(tenantId, edge.getId(), EdgeEventActionType.RPC_CALL, device.getId().getId(), EdgeEventType.DEVICE, body); EdgeEvent edgeEvent = constructEdgeEvent(tenantId, edge.getId(), EdgeEventActionType.RPC_CALL, device.getId().getId(), EdgeEventType.DEVICE, body);
edgeImitator.expectMessageAmount(1); edgeImitator.expectMessageAmount(1);
edgeEventService.save(edgeEvent); edgeEventService.saveAsync(edgeEvent).get();
clusterService.onEdgeEventUpdate(tenantId, edge.getId()); clusterService.onEdgeEventUpdate(tenantId, edge.getId());
Assert.assertTrue(edgeImitator.waitForMessages()); Assert.assertTrue(edgeImitator.waitForMessages());
@ -1122,7 +1122,7 @@ abstract public class BaseEdgeTest extends AbstractControllerTest {
JsonNode timeseriesEntityData = mapper.readTree(timeseriesData); JsonNode timeseriesEntityData = mapper.readTree(timeseriesData);
EdgeEvent edgeEvent = constructEdgeEvent(tenantId, edge.getId(), EdgeEventActionType.TIMESERIES_UPDATED, EdgeEvent edgeEvent = constructEdgeEvent(tenantId, edge.getId(), EdgeEventActionType.TIMESERIES_UPDATED,
device.getId().getId(), EdgeEventType.DEVICE, timeseriesEntityData); device.getId().getId(), EdgeEventType.DEVICE, timeseriesEntityData);
edgeEventService.save(edgeEvent); edgeEventService.saveAsync(edgeEvent).get();
clusterService.onEdgeEventUpdate(tenantId, edge.getId()); clusterService.onEdgeEventUpdate(tenantId, edge.getId());
} }

View File

@ -15,6 +15,7 @@
*/ */
package org.thingsboard.server.dao.edge; package org.thingsboard.server.dao.edge;
import com.google.common.util.concurrent.ListenableFuture;
import org.thingsboard.server.common.data.edge.EdgeEvent; import org.thingsboard.server.common.data.edge.EdgeEvent;
import org.thingsboard.server.common.data.id.EdgeId; import org.thingsboard.server.common.data.id.EdgeId;
import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.id.TenantId;
@ -23,7 +24,7 @@ import org.thingsboard.server.common.data.page.TimePageLink;
public interface EdgeEventService { public interface EdgeEventService {
EdgeEvent save(EdgeEvent edgeEvent); ListenableFuture<Void> saveAsync(EdgeEvent edgeEvent);
PageData<EdgeEvent> findEdgeEvents(TenantId tenantId, EdgeId edgeId, TimePageLink pageLink, boolean withTsUpdate); PageData<EdgeEvent> findEdgeEvents(TenantId tenantId, EdgeId edgeId, TimePageLink pageLink, boolean withTsUpdate);

View File

@ -15,6 +15,7 @@
*/ */
package org.thingsboard.server.dao.edge; package org.thingsboard.server.dao.edge;
import com.google.common.util.concurrent.ListenableFuture;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
@ -36,9 +37,9 @@ public class BaseEdgeEventService implements EdgeEventService {
private DataValidator<EdgeEvent> edgeEventValidator; private DataValidator<EdgeEvent> edgeEventValidator;
@Override @Override
public EdgeEvent save(EdgeEvent edgeEvent) { public ListenableFuture<Void> saveAsync(EdgeEvent edgeEvent) {
edgeEventValidator.validate(edgeEvent, EdgeEvent::getTenantId); edgeEventValidator.validate(edgeEvent, EdgeEvent::getTenantId);
return edgeEventDao.save(edgeEvent); return edgeEventDao.saveAsync(edgeEvent);
} }
@Override @Override

View File

@ -35,7 +35,7 @@ public interface EdgeEventDao extends Dao<EdgeEvent> {
* @param edgeEvent the event object * @param edgeEvent the event object
* @return saved edge event object future * @return saved edge event object future
*/ */
EdgeEvent save(EdgeEvent edgeEvent); ListenableFuture<Void> saveAsync(EdgeEvent edgeEvent);
/** /**

View File

@ -0,0 +1,79 @@
/**
* Copyright © 2016-2022 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.dao.sql.edge;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jdbc.core.BatchPreparedStatementSetter;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Repository;
import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.transaction.support.TransactionCallbackWithoutResult;
import org.springframework.transaction.support.TransactionTemplate;
import org.thingsboard.server.dao.model.sql.EdgeEventEntity;
import org.thingsboard.server.dao.util.PsqlDao;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.List;
@PsqlDao
@Repository
@Transactional
public class EdgeEventInsertRepository {
private static final String INSERT =
"INSERT INTO edge_event (id, created_time, edge_id, edge_event_type, edge_event_uid, entity_id, edge_event_action, body, tenant_id, ts) " +
"VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) " +
"ON CONFLICT DO NOTHING;";
@Autowired
protected JdbcTemplate jdbcTemplate;
@Autowired
private TransactionTemplate transactionTemplate;
protected void save(List<EdgeEventEntity> entities) {
transactionTemplate.execute(new TransactionCallbackWithoutResult() {
@Override
protected void doInTransactionWithoutResult(TransactionStatus status) {
jdbcTemplate.batchUpdate(INSERT, new BatchPreparedStatementSetter() {
@Override
public void setValues(PreparedStatement ps, int i) throws SQLException {
EdgeEventEntity edgeEvent = entities.get(i);
ps.setObject(1, edgeEvent.getId());
ps.setLong(2, edgeEvent.getCreatedTime());
ps.setObject(3, edgeEvent.getEdgeId());
ps.setString(4, edgeEvent.getEdgeEventType().name());
ps.setString(5, edgeEvent.getEdgeEventUid());
ps.setObject(6, edgeEvent.getEntityId());
ps.setString(7, edgeEvent.getEdgeEventAction().name());
ps.setString(8, edgeEvent.getEntityBody() != null
? edgeEvent.getEntityBody().toString()
: null);
ps.setObject(9, edgeEvent.getTenantId());
ps.setLong(10, edgeEvent.getTs());
}
@Override
public int getBatchSize() {
return entities.size();
}
});
}
});
}
}

View File

@ -16,9 +16,11 @@
package org.thingsboard.server.dao.sql.edge; package org.thingsboard.server.dao.sql.edge;
import com.datastax.oss.driver.api.core.uuid.Uuids; import com.datastax.oss.driver.api.core.uuid.Uuids;
import com.google.common.util.concurrent.ListenableFuture;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.data.jpa.repository.JpaRepository; import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import org.thingsboard.server.common.data.edge.EdgeEvent; import org.thingsboard.server.common.data.edge.EdgeEvent;
@ -26,15 +28,22 @@ import org.thingsboard.server.common.data.id.EdgeEventId;
import org.thingsboard.server.common.data.id.EdgeId; import org.thingsboard.server.common.data.id.EdgeId;
import org.thingsboard.server.common.data.page.PageData; import org.thingsboard.server.common.data.page.PageData;
import org.thingsboard.server.common.data.page.TimePageLink; import org.thingsboard.server.common.data.page.TimePageLink;
import org.thingsboard.server.common.stats.StatsFactory;
import org.thingsboard.server.dao.DaoUtil; import org.thingsboard.server.dao.DaoUtil;
import org.thingsboard.server.dao.edge.EdgeEventDao; import org.thingsboard.server.dao.edge.EdgeEventDao;
import org.thingsboard.server.dao.model.sql.EdgeEventEntity; import org.thingsboard.server.dao.model.sql.EdgeEventEntity;
import org.thingsboard.server.dao.sql.JpaAbstractSearchTextDao; import org.thingsboard.server.dao.sql.JpaAbstractSearchTextDao;
import org.thingsboard.server.dao.sql.ScheduledLogExecutorComponent;
import org.thingsboard.server.dao.sql.TbSqlBlockingQueueParams;
import org.thingsboard.server.dao.sql.TbSqlBlockingQueueWrapper;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.sql.Connection; import java.sql.Connection;
import java.sql.PreparedStatement; import java.sql.PreparedStatement;
import java.sql.ResultSet; import java.sql.ResultSet;
import java.sql.SQLException; import java.sql.SQLException;
import java.util.Comparator;
import java.util.Objects; import java.util.Objects;
import java.util.Optional; import java.util.Optional;
import java.util.UUID; import java.util.UUID;
@ -43,6 +52,7 @@ import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Function;
import static org.thingsboard.server.dao.model.ModelConstants.NULL_UUID; import static org.thingsboard.server.dao.model.ModelConstants.NULL_UUID;
@ -52,11 +62,32 @@ public class JpaBaseEdgeEventDao extends JpaAbstractSearchTextDao<EdgeEventEntit
private final UUID systemTenantId = NULL_UUID; private final UUID systemTenantId = NULL_UUID;
private final ConcurrentMap<EdgeId, Lock> readWriteLocks = new ConcurrentHashMap<>(); @Autowired
ScheduledLogExecutorComponent logExecutor;
@Autowired
private StatsFactory statsFactory;
@Value("${sql.edge_events.batch_size:10000}")
private int batchSize;
@Value("${sql.edge_events.batch_max_delay:100}")
private long maxDelay;
@Value("${sql.edge_events.stats_print_interval_ms:10000}")
private long statsPrintIntervalMs;
@Value("${sql.edge_events.batch_threads:3}")
private int batchThreads;
private TbSqlBlockingQueueWrapper<EdgeEventEntity> queue;
@Autowired @Autowired
private EdgeEventRepository edgeEventRepository; private EdgeEventRepository edgeEventRepository;
@Autowired
private EdgeEventInsertRepository edgeEventInsertRepository;
@Override @Override
protected Class<EdgeEventEntity> getEntityClass() { protected Class<EdgeEventEntity> getEntityClass() {
return EdgeEventEntity.class; return EdgeEventEntity.class;
@ -67,66 +98,58 @@ public class JpaBaseEdgeEventDao extends JpaAbstractSearchTextDao<EdgeEventEntit
return edgeEventRepository; return edgeEventRepository;
} }
@Override @PostConstruct
public EdgeEvent save(EdgeEvent edgeEvent) { private void init() {
final Lock readWriteLock = readWriteLocks.computeIfAbsent(edgeEvent.getEdgeId(), id -> new ReentrantLock()); TbSqlBlockingQueueParams params = TbSqlBlockingQueueParams.builder()
readWriteLock.lock(); .logName("Edge Events")
try { .batchSize(batchSize)
log.debug("Save edge event [{}] ", edgeEvent); .maxDelay(maxDelay)
if (edgeEvent.getId() == null) { .statsPrintIntervalMs(statsPrintIntervalMs)
UUID timeBased = Uuids.timeBased(); .statsNamePrefix("edge.events")
edgeEvent.setId(new EdgeEventId(timeBased)); .batchSortEnabled(true)
edgeEvent.setCreatedTime(Uuids.unixTimestamp(timeBased)); .build();
} else if (edgeEvent.getCreatedTime() == 0L) { Function<EdgeEventEntity, Integer> hashcodeFunction = entity -> {
UUID eventId = edgeEvent.getId().getId(); if (entity.getEntityId() != null) {
if (eventId.version() == 1) { return entity.getEntityId().hashCode();
edgeEvent.setCreatedTime(Uuids.unixTimestamp(eventId));
} else {
edgeEvent.setCreatedTime(System.currentTimeMillis());
}
}
if (StringUtils.isEmpty(edgeEvent.getUid())) {
edgeEvent.setUid(edgeEvent.getId().toString());
}
return save(new EdgeEventEntity(edgeEvent)).orElse(null);
} finally {
readWriteLock.unlock();
}
}
@Override
public PageData<EdgeEvent> findEdgeEvents(UUID tenantId, EdgeId edgeId, TimePageLink pageLink, boolean withTsUpdate) {
final Lock readWriteLock = readWriteLocks.computeIfAbsent(edgeId, id -> new ReentrantLock());
readWriteLock.lock();
try {
if (withTsUpdate) {
return DaoUtil.toPageData(
edgeEventRepository
.findEdgeEventsByTenantIdAndEdgeId(
tenantId,
edgeId.getId(),
Objects.toString(pageLink.getTextSearch(), ""),
pageLink.getStartTime(),
pageLink.getEndTime(),
DaoUtil.toPageable(pageLink)));
} else { } else {
return DaoUtil.toPageData( return NULL_UUID.hashCode();
edgeEventRepository
.findEdgeEventsByTenantIdAndEdgeIdWithoutTimeseriesUpdated(
tenantId,
edgeId.getId(),
Objects.toString(pageLink.getTextSearch(), ""),
pageLink.getStartTime(),
pageLink.getEndTime(),
DaoUtil.toPageable(pageLink)));
} }
} finally { };
readWriteLock.unlock(); queue = new TbSqlBlockingQueueWrapper<>(params, hashcodeFunction, batchThreads, statsFactory);
queue.init(logExecutor, v -> edgeEventInsertRepository.save(v),
Comparator.comparing(EdgeEventEntity::getTs)
);
}
@PreDestroy
private void destroy() {
if (queue != null) {
queue.destroy();
} }
} }
public Optional<EdgeEvent> save(EdgeEventEntity entity) { @Override
public ListenableFuture<Void> saveAsync(EdgeEvent edgeEvent) {
log.debug("Save edge event [{}] ", edgeEvent);
if (edgeEvent.getId() == null) {
UUID timeBased = Uuids.timeBased();
edgeEvent.setId(new EdgeEventId(timeBased));
edgeEvent.setCreatedTime(Uuids.unixTimestamp(timeBased));
} else if (edgeEvent.getCreatedTime() == 0L) {
UUID eventId = edgeEvent.getId().getId();
if (eventId.version() == 1) {
edgeEvent.setCreatedTime(Uuids.unixTimestamp(eventId));
} else {
edgeEvent.setCreatedTime(System.currentTimeMillis());
}
}
if (StringUtils.isEmpty(edgeEvent.getUid())) {
edgeEvent.setUid(edgeEvent.getId().toString());
}
return save(new EdgeEventEntity(edgeEvent));
}
private ListenableFuture<Void> save(EdgeEventEntity entity) {
log.debug("Save edge event [{}] ", entity); log.debug("Save edge event [{}] ", entity);
if (entity.getTenantId() == null) { if (entity.getTenantId() == null) {
log.trace("Save system edge event with predefined id {}", systemTenantId); log.trace("Save system edge event with predefined id {}", systemTenantId);
@ -135,7 +158,39 @@ public class JpaBaseEdgeEventDao extends JpaAbstractSearchTextDao<EdgeEventEntit
if (entity.getUuid() == null) { if (entity.getUuid() == null) {
entity.setUuid(Uuids.timeBased()); entity.setUuid(Uuids.timeBased());
} }
return Optional.of(DaoUtil.getData(edgeEventRepository.save(entity)));
return addToQueue(entity);
}
private ListenableFuture<Void> addToQueue(EdgeEventEntity entity) {
return queue.add(entity);
}
@Override
public PageData<EdgeEvent> findEdgeEvents(UUID tenantId, EdgeId edgeId, TimePageLink pageLink, boolean withTsUpdate) {
if (withTsUpdate) {
return DaoUtil.toPageData(
edgeEventRepository
.findEdgeEventsByTenantIdAndEdgeId(
tenantId,
edgeId.getId(),
Objects.toString(pageLink.getTextSearch(), ""),
pageLink.getStartTime(),
pageLink.getEndTime(),
DaoUtil.toPageable(pageLink)));
} else {
return DaoUtil.toPageData(
edgeEventRepository
.findEdgeEventsByTenantIdAndEdgeIdWithoutTimeseriesUpdated(
tenantId,
edgeId.getId(),
Objects.toString(pageLink.getTextSearch(), ""),
pageLink.getStartTime(),
pageLink.getEndTime(),
DaoUtil.toPageable(pageLink)));
}
} }
@Override @Override

View File

@ -16,6 +16,8 @@
package org.thingsboard.server.dao.service; package org.thingsboard.server.dao.service;
import com.datastax.oss.driver.api.core.uuid.Uuids; import com.datastax.oss.driver.api.core.uuid.Uuids;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
import org.thingsboard.server.common.data.edge.EdgeEvent; import org.thingsboard.server.common.data.edge.EdgeEvent;
@ -34,6 +36,8 @@ import java.io.IOException;
import java.time.LocalDateTime; import java.time.LocalDateTime;
import java.time.Month; import java.time.Month;
import java.time.ZoneOffset; import java.time.ZoneOffset;
import java.util.ArrayList;
import java.util.List;
public abstract class BaseEdgeEventServiceTest extends AbstractServiceTest { public abstract class BaseEdgeEventServiceTest extends AbstractServiceTest {
@ -41,8 +45,14 @@ public abstract class BaseEdgeEventServiceTest extends AbstractServiceTest {
public void saveEdgeEvent() throws Exception { public void saveEdgeEvent() throws Exception {
EdgeId edgeId = new EdgeId(Uuids.timeBased()); EdgeId edgeId = new EdgeId(Uuids.timeBased());
DeviceId deviceId = new DeviceId(Uuids.timeBased()); DeviceId deviceId = new DeviceId(Uuids.timeBased());
EdgeEvent edgeEvent = generateEdgeEvent(null, edgeId, deviceId, EdgeEventActionType.ADDED); TenantId tenantId = new TenantId(Uuids.timeBased());
EdgeEvent saved = edgeEventService.save(edgeEvent); EdgeEvent edgeEvent = generateEdgeEvent(tenantId, edgeId, deviceId, EdgeEventActionType.ADDED);
edgeEventService.saveAsync(edgeEvent).get();
PageData<EdgeEvent> edgeEvents = edgeEventService.findEdgeEvents(tenantId, edgeId, new TimePageLink(1), false);
Assert.assertFalse(edgeEvents.getData().isEmpty());
EdgeEvent saved = edgeEvents.getData().get(0);
Assert.assertEquals(saved.getTenantId(), edgeEvent.getTenantId()); Assert.assertEquals(saved.getTenantId(), edgeEvent.getTenantId());
Assert.assertEquals(saved.getEdgeId(), edgeEvent.getEdgeId()); Assert.assertEquals(saved.getEdgeId(), edgeEvent.getEdgeId());
Assert.assertEquals(saved.getEntityId(), edgeEvent.getEntityId()); Assert.assertEquals(saved.getEntityId(), edgeEvent.getEntityId());
@ -77,27 +87,31 @@ public abstract class BaseEdgeEventServiceTest extends AbstractServiceTest {
EdgeId edgeId = new EdgeId(Uuids.timeBased()); EdgeId edgeId = new EdgeId(Uuids.timeBased());
DeviceId deviceId = new DeviceId(Uuids.timeBased()); DeviceId deviceId = new DeviceId(Uuids.timeBased());
TenantId tenantId = TenantId.fromUUID(Uuids.timeBased()); TenantId tenantId = TenantId.fromUUID(Uuids.timeBased());
saveEdgeEventWithProvidedTime(timeBeforeStartTime, edgeId, deviceId, tenantId);
EdgeEvent savedEdgeEvent = saveEdgeEventWithProvidedTime(eventTime, edgeId, deviceId, tenantId); List<ListenableFuture<Void>> futures = new ArrayList<>();
EdgeEvent savedEdgeEvent2 = saveEdgeEventWithProvidedTime(eventTime + 1, edgeId, deviceId, tenantId); futures.add(saveEdgeEventWithProvidedTime(timeBeforeStartTime, edgeId, deviceId, tenantId));
EdgeEvent savedEdgeEvent3 = saveEdgeEventWithProvidedTime(eventTime + 2, edgeId, deviceId, tenantId); futures.add(saveEdgeEventWithProvidedTime(eventTime, edgeId, deviceId, tenantId));
saveEdgeEventWithProvidedTime(timeAfterEndTime, edgeId, deviceId, tenantId); futures.add(saveEdgeEventWithProvidedTime(eventTime + 1, edgeId, deviceId, tenantId));
futures.add(saveEdgeEventWithProvidedTime(eventTime + 2, edgeId, deviceId, tenantId));
futures.add(saveEdgeEventWithProvidedTime(timeAfterEndTime, edgeId, deviceId, tenantId));
Futures.allAsList(futures).get();
TimePageLink pageLink = new TimePageLink(2, 0, "", new SortOrder("createdTime", SortOrder.Direction.DESC), startTime, endTime); TimePageLink pageLink = new TimePageLink(2, 0, "", new SortOrder("createdTime", SortOrder.Direction.DESC), startTime, endTime);
PageData<EdgeEvent> edgeEvents = edgeEventService.findEdgeEvents(tenantId, edgeId, pageLink, true); PageData<EdgeEvent> edgeEvents = edgeEventService.findEdgeEvents(tenantId, edgeId, pageLink, true);
Assert.assertNotNull(edgeEvents.getData()); Assert.assertNotNull(edgeEvents.getData());
Assert.assertTrue(edgeEvents.getData().size() == 2); Assert.assertEquals(2, edgeEvents.getData().size());
Assert.assertTrue(edgeEvents.getData().get(0).getUuidId().equals(savedEdgeEvent3.getUuidId())); Assert.assertEquals(Uuids.startOf(eventTime + 2), edgeEvents.getData().get(0).getUuidId());
Assert.assertTrue(edgeEvents.getData().get(1).getUuidId().equals(savedEdgeEvent2.getUuidId())); Assert.assertEquals(Uuids.startOf(eventTime + 1), edgeEvents.getData().get(1).getUuidId());
Assert.assertTrue(edgeEvents.hasNext()); Assert.assertTrue(edgeEvents.hasNext());
Assert.assertNotNull(pageLink.nextPageLink()); Assert.assertNotNull(pageLink.nextPageLink());
edgeEvents = edgeEventService.findEdgeEvents(tenantId, edgeId, pageLink.nextPageLink(), true); edgeEvents = edgeEventService.findEdgeEvents(tenantId, edgeId, pageLink.nextPageLink(), true);
Assert.assertNotNull(edgeEvents.getData()); Assert.assertNotNull(edgeEvents.getData());
Assert.assertTrue(edgeEvents.getData().size() == 1); Assert.assertEquals(1, edgeEvents.getData().size());
Assert.assertTrue(edgeEvents.getData().get(0).getUuidId().equals(savedEdgeEvent.getUuidId())); Assert.assertEquals(Uuids.startOf(eventTime), edgeEvents.getData().get(0).getUuidId());
Assert.assertFalse(edgeEvents.hasNext()); Assert.assertFalse(edgeEvents.hasNext());
} }
@ -109,7 +123,7 @@ public abstract class BaseEdgeEventServiceTest extends AbstractServiceTest {
TimePageLink pageLink = new TimePageLink(1, 0, null, new SortOrder("createdTime", SortOrder.Direction.ASC)); TimePageLink pageLink = new TimePageLink(1, 0, null, new SortOrder("createdTime", SortOrder.Direction.ASC));
EdgeEvent edgeEventWithTsUpdate = generateEdgeEvent(tenantId, edgeId, deviceId, EdgeEventActionType.TIMESERIES_UPDATED); EdgeEvent edgeEventWithTsUpdate = generateEdgeEvent(tenantId, edgeId, deviceId, EdgeEventActionType.TIMESERIES_UPDATED);
edgeEventService.save(edgeEventWithTsUpdate); edgeEventService.saveAsync(edgeEventWithTsUpdate).get();
PageData<EdgeEvent> allEdgeEvents = edgeEventService.findEdgeEvents(tenantId, edgeId, pageLink, true); PageData<EdgeEvent> allEdgeEvents = edgeEventService.findEdgeEvents(tenantId, edgeId, pageLink, true);
PageData<EdgeEvent> edgeEventsWithoutTsUpdate = edgeEventService.findEdgeEvents(tenantId, edgeId, pageLink, false); PageData<EdgeEvent> edgeEventsWithoutTsUpdate = edgeEventService.findEdgeEvents(tenantId, edgeId, pageLink, false);
@ -121,9 +135,9 @@ public abstract class BaseEdgeEventServiceTest extends AbstractServiceTest {
Assert.assertTrue(edgeEventsWithoutTsUpdate.getData().isEmpty()); Assert.assertTrue(edgeEventsWithoutTsUpdate.getData().isEmpty());
} }
private EdgeEvent saveEdgeEventWithProvidedTime(long time, EdgeId edgeId, EntityId entityId, TenantId tenantId) throws Exception { private ListenableFuture<Void> saveEdgeEventWithProvidedTime(long time, EdgeId edgeId, EntityId entityId, TenantId tenantId) throws Exception {
EdgeEvent edgeEvent = generateEdgeEvent(tenantId, edgeId, entityId, EdgeEventActionType.ADDED); EdgeEvent edgeEvent = generateEdgeEvent(tenantId, edgeId, entityId, EdgeEventActionType.ADDED);
edgeEvent.setId(new EdgeEventId(Uuids.startOf(time))); edgeEvent.setId(new EdgeEventId(Uuids.startOf(time)));
return edgeEventService.save(edgeEvent); return edgeEventService.saveAsync(edgeEvent);
} }
} }

View File

@ -80,10 +80,6 @@ public abstract class AbstractTbMsgPushNode<T extends BaseTbMsgPushNodeConfigura
if (DataConstants.ALARM.equals(msgType)) { if (DataConstants.ALARM.equals(msgType)) {
return buildEvent(ctx.getTenantId(), EdgeEventActionType.ADDED, getUUIDFromMsgData(msg), getAlarmEventType(), null); return buildEvent(ctx.getTenantId(), EdgeEventActionType.ADDED, getUUIDFromMsgData(msg), getAlarmEventType(), null);
} else { } else {
U eventTypeByEntityType = getEventTypeByEntityType(msg.getOriginator().getEntityType());
if (eventTypeByEntityType == null) {
return null;
}
EdgeEventActionType actionType = getEdgeEventActionTypeByMsgType(msgType); EdgeEventActionType actionType = getEdgeEventActionTypeByMsgType(msgType);
Map<String, Object> entityBody = new HashMap<>(); Map<String, Object> entityBody = new HashMap<>();
Map<String, String> metadata = msg.getMetaData().getData(); Map<String, String> metadata = msg.getMetaData().getData();
@ -107,7 +103,11 @@ public abstract class AbstractTbMsgPushNode<T extends BaseTbMsgPushNodeConfigura
entityBody.put("ts", msg.getMetaDataTs()); entityBody.put("ts", msg.getMetaDataTs());
break; break;
} }
return buildEvent(ctx.getTenantId(), actionType, msg.getOriginator().getId(), eventTypeByEntityType, JacksonUtil.valueToTree(entityBody)); return buildEvent(ctx.getTenantId(),
actionType,
msg.getOriginator().getId(),
getEventTypeByEntityType(msg.getOriginator().getEntityType()),
JacksonUtil.valueToTree(entityBody));
} }
} }

View File

@ -16,7 +16,11 @@
package org.thingsboard.rule.engine.edge; package org.thingsboard.rule.engine.edge;
import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.thingsboard.rule.engine.api.RuleNode; import org.thingsboard.rule.engine.api.RuleNode;
import org.thingsboard.rule.engine.api.TbContext; import org.thingsboard.rule.engine.api.TbContext;
import org.thingsboard.server.common.data.DataConstants; import org.thingsboard.server.common.data.DataConstants;
@ -33,6 +37,8 @@ import org.thingsboard.server.common.data.plugin.ComponentType;
import org.thingsboard.server.common.data.rule.RuleChainType; import org.thingsboard.server.common.data.rule.RuleChainType;
import org.thingsboard.server.common.msg.TbMsg; import org.thingsboard.server.common.msg.TbMsg;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID; import java.util.UUID;
import static org.thingsboard.rule.engine.api.TbRelationTypes.SUCCESS; import static org.thingsboard.rule.engine.api.TbRelationTypes.SUCCESS;
@ -107,54 +113,70 @@ public class TbMsgPushToEdgeNode extends AbstractTbMsgPushNode<TbMsgPushToEdgeNo
@Override @Override
protected void processMsg(TbContext ctx, TbMsg msg) { protected void processMsg(TbContext ctx, TbMsg msg) {
if (EntityType.EDGE.equals(msg.getOriginator().getEntityType())) { try {
EdgeEvent edgeEvent = buildEvent(msg, ctx); if (EntityType.EDGE.equals(msg.getOriginator().getEntityType())) {
if (edgeEvent != null) { EdgeEvent edgeEvent = buildEvent(msg, ctx);
EdgeId edgeId = new EdgeId(msg.getOriginator().getId()); EdgeId edgeId = new EdgeId(msg.getOriginator().getId());
notifyEdge(ctx, msg, edgeEvent, edgeId); ListenableFuture<Void> future = notifyEdge(ctx, edgeEvent, edgeId);
FutureCallback<Void> futureCallback = new FutureCallback<>() {
@Override
public void onSuccess(@Nullable Void result) {
ctx.tellSuccess(msg);
}
@Override
public void onFailure(Throwable t) {
ctx.tellFailure(msg, t);
}
};
Futures.addCallback(future, futureCallback, ctx.getDbCallbackExecutor());
} else { } else {
tellFailure(ctx, msg); PageLink pageLink = new PageLink(DEFAULT_PAGE_SIZE);
} PageData<EdgeId> pageData;
} else { List<ListenableFuture<Void>> futures = new ArrayList<>();
PageLink pageLink = new PageLink(DEFAULT_PAGE_SIZE); do {
PageData<EdgeId> pageData; pageData = ctx.getEdgeService().findRelatedEdgeIdsByEntityId(ctx.getTenantId(), msg.getOriginator(), pageLink);
boolean edgeNotified = false; if (pageData != null && pageData.getData() != null && !pageData.getData().isEmpty()) {
do { for (EdgeId edgeId : pageData.getData()) {
pageData = ctx.getEdgeService().findRelatedEdgeIdsByEntityId(ctx.getTenantId(), msg.getOriginator(), pageLink); EdgeEvent edgeEvent = buildEvent(msg, ctx);
if (pageData != null && pageData.getData() != null && !pageData.getData().isEmpty()) { futures.add(notifyEdge(ctx, edgeEvent, edgeId));
for (EdgeId edgeId : pageData.getData()) { }
EdgeEvent edgeEvent = buildEvent(msg, ctx); if (pageData.hasNext()) {
if (edgeEvent != null) { pageLink = pageLink.nextPageLink();
notifyEdge(ctx, msg, edgeEvent, edgeId);
edgeNotified = true;
} else {
tellFailure(ctx, msg);
} }
} }
if (pageData.hasNext()) { } while (pageData != null && pageData.hasNext());
pageLink = pageLink.nextPageLink();
}
}
} while (pageData != null && pageData.hasNext());
if (!edgeNotified) { if (futures.isEmpty()) {
// ack in case no edges are related to provided entity // ack in case no edges are related to provided entity
ctx.ack(msg); ctx.ack(msg);
} else {
Futures.addCallback(Futures.allAsList(futures), new FutureCallback<>() {
@Override
public void onSuccess(@Nullable List<Void> voids) {
ctx.tellSuccess(msg);
}
@Override
public void onFailure(Throwable t) {
ctx.tellFailure(msg, t);
}
}, ctx.getDbCallbackExecutor());
}
} }
} catch (Exception e) {
log.error("Failed to build edge event", e);
ctx.tellFailure(msg, e);
} }
} }
private void tellFailure(TbContext ctx, TbMsg msg) { private ListenableFuture<Void> notifyEdge(TbContext ctx, EdgeEvent edgeEvent, EdgeId edgeId) {
String errMsg = String.format("Edge event type is null. Entity Type %s", msg.getOriginator().getEntityType());
log.warn(errMsg);
ctx.tellFailure(msg, new RuntimeException(errMsg));
}
private void notifyEdge(TbContext ctx, TbMsg msg, EdgeEvent edgeEvent, EdgeId edgeId) {
edgeEvent.setEdgeId(edgeId); edgeEvent.setEdgeId(edgeId);
ctx.getEdgeEventService().save(edgeEvent); ListenableFuture<Void> future = ctx.getEdgeEventService().saveAsync(edgeEvent);
ctx.tellNext(msg, SUCCESS); return Futures.transform(future, result -> {
ctx.onEdgeEventUpdate(ctx.getTenantId(), edgeId); ctx.onEdgeEventUpdate(ctx.getTenantId(), edgeId);
return null;
}, ctx.getDbCallbackExecutor());
} }
} }