diff --git a/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java b/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java index 4840f2a0e8..94581b91a7 100644 --- a/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java +++ b/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java @@ -21,6 +21,9 @@ import akka.actor.Scheduler; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ObjectNode; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; import com.typesafe.config.Config; import com.typesafe.config.ConfigFactory; import lombok.Getter; @@ -66,6 +69,7 @@ import org.thingsboard.server.service.script.JsSandboxService; import org.thingsboard.server.service.state.DeviceStateService; import org.thingsboard.server.service.telemetry.TelemetrySubscriptionService; +import javax.annotation.Nullable; import java.io.IOException; import java.io.PrintWriter; import java.io.StringWriter; @@ -314,22 +318,22 @@ public class ActorSystemContext { } public void persistDebugInput(TenantId tenantId, EntityId entityId, TbMsg tbMsg, String relationType) { - persistDebug(tenantId, entityId, "IN", tbMsg, relationType, null); + persistDebugAsync(tenantId, entityId, "IN", tbMsg, relationType, null); } public void persistDebugInput(TenantId tenantId, EntityId entityId, TbMsg tbMsg, String relationType, Throwable error) { - persistDebug(tenantId, entityId, "IN", tbMsg, relationType, error); + persistDebugAsync(tenantId, entityId, "IN", tbMsg, relationType, error); } public void persistDebugOutput(TenantId tenantId, EntityId entityId, TbMsg tbMsg, String relationType, Throwable error) { - persistDebug(tenantId, entityId, "OUT", tbMsg, relationType, error); + persistDebugAsync(tenantId, entityId, "OUT", tbMsg, relationType, error); } public void persistDebugOutput(TenantId tenantId, EntityId entityId, TbMsg tbMsg, String relationType) { - persistDebug(tenantId, entityId, "OUT", tbMsg, relationType, null); + persistDebugAsync(tenantId, entityId, "OUT", tbMsg, relationType, null); } - private void persistDebug(TenantId tenantId, EntityId entityId, String type, TbMsg tbMsg, String relationType, Throwable error) { + private void persistDebugAsync(TenantId tenantId, EntityId entityId, String type, TbMsg tbMsg, String relationType, Throwable error) { try { Event event = new Event(); event.setTenantId(tenantId); @@ -355,7 +359,18 @@ public class ActorSystemContext { } event.setBody(node); - eventService.save(event); + ListenableFuture future = eventService.saveAsync(event); + Futures.addCallback(future, new FutureCallback() { + @Override + public void onSuccess(@Nullable Event event) { + + } + + @Override + public void onFailure(Throwable th) { + log.error("Could not save debug Event for Node", th); + } + }); } catch (IOException ex) { log.warn("Failed to persist rule node debug message", ex); } diff --git a/dao/src/main/java/org/thingsboard/server/dao/event/BaseEventService.java b/dao/src/main/java/org/thingsboard/server/dao/event/BaseEventService.java index 55da480079..7dddec17ef 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/event/BaseEventService.java +++ b/dao/src/main/java/org/thingsboard/server/dao/event/BaseEventService.java @@ -15,6 +15,7 @@ */ package org.thingsboard.server.dao.event; +import com.google.common.util.concurrent.ListenableFuture; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.springframework.beans.factory.annotation.Autowired; @@ -43,6 +44,12 @@ public class BaseEventService implements EventService { return eventDao.save(event); } + @Override + public ListenableFuture saveAsync(Event event) { + eventValidator.validate(event); + return eventDao.saveAsync(event); + } + @Override public Optional saveIfNotExists(Event event) { eventValidator.validate(event); diff --git a/dao/src/main/java/org/thingsboard/server/dao/event/CassandraBaseEventDao.java b/dao/src/main/java/org/thingsboard/server/dao/event/CassandraBaseEventDao.java index 23655bb358..7549e40108 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/event/CassandraBaseEventDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/event/CassandraBaseEventDao.java @@ -15,11 +15,13 @@ */ package org.thingsboard.server.dao.event; -import com.datastax.driver.core.ResultSet; +import com.datastax.driver.core.ResultSetFuture; import com.datastax.driver.core.querybuilder.Insert; import com.datastax.driver.core.querybuilder.QueryBuilder; import com.datastax.driver.core.querybuilder.Select; import com.datastax.driver.core.utils.UUIDs; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.springframework.stereotype.Component; @@ -38,13 +40,11 @@ import java.util.Arrays; import java.util.List; import java.util.Optional; import java.util.UUID; +import java.util.concurrent.ExecutionException; import static com.datastax.driver.core.querybuilder.QueryBuilder.eq; import static com.datastax.driver.core.querybuilder.QueryBuilder.select; -import static org.thingsboard.server.dao.model.ModelConstants.EVENT_BY_ID_VIEW_NAME; -import static org.thingsboard.server.dao.model.ModelConstants.EVENT_BY_TYPE_AND_ID_VIEW_NAME; -import static org.thingsboard.server.dao.model.ModelConstants.EVENT_COLUMN_FAMILY_NAME; -import static org.thingsboard.server.dao.model.ModelConstants.NULL_UUID; +import static org.thingsboard.server.dao.model.ModelConstants.*; @Component @Slf4j @@ -65,6 +65,15 @@ public class CassandraBaseEventDao extends CassandraAbstractSearchTimeDao saveAsync(Event event) { log.debug("Save event [{}] ", event); if (event.getTenantId() == null) { log.trace("Save system event with predefined id {}", systemTenantId); @@ -76,7 +85,8 @@ public class CassandraBaseEventDao extends CassandraAbstractSearchTimeDao> optionalSave = saveAsync(new EventEntity(event), false); + return Futures.transform(optionalSave, opt -> opt.orElse(null)); } @Override @@ -153,6 +163,14 @@ public class CassandraBaseEventDao extends CassandraAbstractSearchTimeDao save(EventEntity entity, boolean ifNotExists) { + try { + return saveAsync(entity, ifNotExists).get(); + } catch (InterruptedException | ExecutionException e) { + throw new IllegalStateException("Could not save EventEntity", e); + } + } + + private ListenableFuture> saveAsync(EventEntity entity, boolean ifNotExists) { if (entity.getId() == null) { entity.setId(UUIDs.timeBased()); } @@ -167,11 +185,13 @@ public class CassandraBaseEventDao extends CassandraAbstractSearchTimeDao { + if (rs.wasApplied()) { + return Optional.of(DaoUtil.getData(entity)); + } else { + return Optional.empty(); + } + }); } } diff --git a/dao/src/main/java/org/thingsboard/server/dao/event/EventDao.java b/dao/src/main/java/org/thingsboard/server/dao/event/EventDao.java index eb0fdbcd80..9469c61f63 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/event/EventDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/event/EventDao.java @@ -15,6 +15,7 @@ */ package org.thingsboard.server.dao.event; +import com.google.common.util.concurrent.ListenableFuture; import org.thingsboard.server.common.data.Event; import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.page.TimePageLink; @@ -37,6 +38,14 @@ public interface EventDao extends Dao { */ Event save(Event event); + /** + * Save or update event object async + * + * @param event the event object + * @return saved event object future + */ + ListenableFuture saveAsync(Event event); + /** * Save event object if it is not yet saved * diff --git a/dao/src/main/java/org/thingsboard/server/dao/event/EventService.java b/dao/src/main/java/org/thingsboard/server/dao/event/EventService.java index 64f823d017..0698c6b0b5 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/event/EventService.java +++ b/dao/src/main/java/org/thingsboard/server/dao/event/EventService.java @@ -15,6 +15,7 @@ */ package org.thingsboard.server.dao.event; +import com.google.common.util.concurrent.ListenableFuture; import org.thingsboard.server.common.data.Event; import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.TenantId; @@ -28,6 +29,8 @@ public interface EventService { Event save(Event event); + ListenableFuture saveAsync(Event event); + Optional saveIfNotExists(Event event); Optional findEvent(TenantId tenantId, EntityId entityId, String eventType, String eventUid); diff --git a/dao/src/main/java/org/thingsboard/server/dao/sql/event/JpaBaseEventDao.java b/dao/src/main/java/org/thingsboard/server/dao/sql/event/JpaBaseEventDao.java index 01183a2d3f..5a63ed8a46 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sql/event/JpaBaseEventDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sql/event/JpaBaseEventDao.java @@ -16,6 +16,7 @@ package org.thingsboard.server.dao.sql.event; import com.datastax.driver.core.utils.UUIDs; +import com.google.common.util.concurrent.ListenableFuture; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.springframework.beans.factory.annotation.Autowired; @@ -81,6 +82,18 @@ public class JpaBaseEventDao extends JpaAbstractSearchTimeDao saveAsync(Event event) { + log.debug("Save event [{}] ", event); + if (event.getId() == null) { + event.setId(new EventId(UUIDs.timeBased())); + } + if (StringUtils.isEmpty(event.getUid())) { + event.setUid(event.getId().toString()); + } + return service.submit(() -> save(new EventEntity(event), false).orElse(null)); + } + @Override public Optional saveIfNotExists(Event event) { return save(new EventEntity(event), true); @@ -89,7 +102,7 @@ public class JpaBaseEventDao extends JpaAbstractSearchTimeDao