Merge branch 'feature/edge' into develop/3.0-edge

This commit is contained in:
Volodymyr Babak 2020-04-20 14:58:31 +03:00
commit 767d0cfc58
11 changed files with 69 additions and 44 deletions

View File

@ -21,6 +21,7 @@ import org.thingsboard.server.actors.service.DefaultActorService;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.page.PageDataIterable.FetchFunction;
import org.thingsboard.server.common.data.rule.RuleChain;
import org.thingsboard.server.common.data.rule.RuleChainType;
public class TenantRuleChainManager extends RuleChainManager {
@ -48,6 +49,6 @@ public class TenantRuleChainManager extends RuleChainManager {
@Override
protected FetchFunction<RuleChain> getFetchEntitiesFunction() {
return link -> service.findTenantRuleChains(tenantId, link);
return link -> service.findTenantRuleChainsByType(tenantId, RuleChainType.SYSTEM, link);
}
}

View File

@ -136,17 +136,11 @@ public class TenantActor extends RuleChainManagerActor {
}
private void onComponentLifecycleMsg(ComponentLifecycleMsg msg) {
RuleChain ruleChain = null;
if (msg.getEntityId().getEntityType() == EntityType.RULE_CHAIN) {
ruleChain = systemContext.getRuleChainService().findRuleChainById(tenantId, new RuleChainId(msg.getEntityId().getId()));
if (ruleChain !=null && !RuleChainType.SYSTEM.equals(ruleChain.getType())) {
log.debug("[{}] Non SYSTEM rule chains are ignored and not started. Current rule chain type [{}]", tenantId, ruleChain.getType());
return;
}
}
ActorRef target = getEntityActorRef(msg.getEntityId());
if (target != null) {
if (msg.getEntityId().getEntityType() == EntityType.RULE_CHAIN && ruleChain != null) {
if (msg.getEntityId().getEntityType() == EntityType.RULE_CHAIN) {
RuleChain ruleChain = systemContext.getRuleChainService().
findRuleChainById(tenantId, new RuleChainId(msg.getEntityId().getId()));
ruleChainManager.visit(ruleChain, target);
}
target.tell(msg, ActorRef.noSender());

View File

@ -242,7 +242,7 @@ public class RuleChainController extends BaseController {
RuleChainType type = RuleChainType.valueOf(typeStr);
return checkNotNull(ruleChainService.findTenantRuleChainsByType(tenantId, type, pageLink));
} else {
return checkNotNull(ruleChainService.findTenantRuleChains(tenantId, pageLink));
return checkNotNull(ruleChainService.findTenantRuleChainsByType(tenantId, RuleChainType.SYSTEM, pageLink));
}
} catch (Exception e) {
throw handleException(e);

View File

@ -29,6 +29,7 @@ import org.thingsboard.server.dao.device.DeviceService;
import org.thingsboard.server.dao.edge.EdgeService;
import org.thingsboard.server.dao.entityview.EntityViewService;
import org.thingsboard.server.dao.relation.RelationService;
import org.thingsboard.server.service.edge.rpc.EdgeEventStorageSettings;
import org.thingsboard.server.service.edge.rpc.constructor.AlarmUpdateMsgConstructor;
import org.thingsboard.server.service.edge.rpc.constructor.AssetUpdateMsgConstructor;
import org.thingsboard.server.service.edge.rpc.constructor.DashboardUpdateMsgConstructor;
@ -108,4 +109,8 @@ public class EdgeContextComponent {
@Lazy
@Autowired
private DashboardUpdateMsgConstructor dashboardUpdateMsgConstructor;
@Lazy
@Autowired
private EdgeEventStorageSettings edgeEventStorageSettings;
}

View File

@ -0,0 +1,32 @@
/**
* Copyright © 2016-2020 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.service.edge.rpc;
import lombok.Data;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
@Component
@Data
public class EdgeEventStorageSettings {
@Value("${edges.rpc.storage.max_read_records_count}")
private int maxReadRecordsCount;
@Value("${edges.rpc.storage.no_read_records_sleep}")
private long noRecordsSleepInterval;
@Value("${edges.rpc.storage.sleep_between_batches}")
private long sleepIntervalBetweenBatches;
}

View File

@ -56,7 +56,7 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase {
private boolean sslEnabled;
@Value("${edges.rpc.ssl.cert}")
private String certFileResource;
@Value("${edges.rpc.ssl.privateKey}")
@Value("${edges.rpc.ssl.private_key}")
private String privateKeyResource;
@Autowired

View File

@ -101,6 +101,8 @@ public final class EdgeGrpcSession implements Cloneable {
private static final ReentrantLock entityCreationLock = new ReentrantLock();
private static final String QUEUE_START_TS_ATTR_KEY = "queueStartTs";
private final UUID sessionId;
private final BiConsumer<EdgeId, EdgeGrpcSession> sessionOpenListener;
private final Consumer<EdgeId> sessionCloseListener;
@ -163,8 +165,7 @@ public final class EdgeGrpcSession implements Cloneable {
void processHandleMessages() throws ExecutionException, InterruptedException {
Long queueStartTs = getQueueStartTs().get();
// TODO: this 100 value must be changed properly
TimePageLink pageLink = new TimePageLink(30, 0, "", null, queueStartTs + 1000, null);
TimePageLink pageLink = new TimePageLink(ctx.getEdgeEventStorageSettings().getMaxReadRecordsCount(), 0, "", null, queueStartTs, null);
PageData<Event> pageData;
UUID ifOffset = null;
do {
@ -173,10 +174,8 @@ public final class EdgeGrpcSession implements Cloneable {
log.trace("[{}] [{}] event(s) are going to be processed.", this.sessionId, pageData.getData().size());
for (Event event : pageData.getData()) {
log.trace("[{}] Processing event [{}]", this.sessionId, event);
EdgeQueueEntry entry;
try {
entry = objectMapper.treeToValue(event.getBody(), EdgeQueueEntry.class);
EdgeQueueEntry entry = objectMapper.treeToValue(event.getBody(), EdgeQueueEntry.class);
UpdateMsgType msgType = getResponseMsgType(entry.getType());
switch (msgType) {
case ENTITY_DELETED_RPC_MESSAGE:
@ -201,6 +200,11 @@ public final class EdgeGrpcSession implements Cloneable {
}
if (pageData.hasNext()) {
pageLink = pageLink.nextPageLink();
try {
Thread.sleep(ctx.getEdgeEventStorageSettings().getSleepIntervalBetweenBatches());
} catch (InterruptedException e) {
log.error("Error during sleep between batches", e);
}
}
} while (pageData.hasNext());
@ -209,7 +213,7 @@ public final class EdgeGrpcSession implements Cloneable {
updateQueueStartTs(newStartTs);
}
try {
Thread.sleep(1000);
Thread.sleep(ctx.getEdgeEventStorageSettings().getNoRecordsSleepInterval());
} catch (InterruptedException e) {
log.error("Error during sleep", e);
}
@ -339,13 +343,14 @@ public final class EdgeGrpcSession implements Cloneable {
}
private void updateQueueStartTs(Long newStartTs) {
List<AttributeKvEntry> attributes = Collections.singletonList(new BaseAttributeKvEntry(new LongDataEntry("queueStartTs", newStartTs), System.currentTimeMillis()));
newStartTs = ++newStartTs; // increments ts by 1 - next edge event search starts from current offset + 1
List<AttributeKvEntry> attributes = Collections.singletonList(new BaseAttributeKvEntry(new LongDataEntry(QUEUE_START_TS_ATTR_KEY, newStartTs), System.currentTimeMillis()));
ctx.getAttributesService().save(edge.getTenantId(), edge.getId(), DataConstants.SERVER_SCOPE, attributes);
}
private ListenableFuture<Long> getQueueStartTs() {
ListenableFuture<Optional<AttributeKvEntry>> future =
ctx.getAttributesService().find(edge.getTenantId(), edge.getId(), DataConstants.SERVER_SCOPE, "queueStartTs");
ctx.getAttributesService().find(edge.getTenantId(), edge.getId(), DataConstants.SERVER_SCOPE, QUEUE_START_TS_ATTR_KEY);
return Futures.transform(future, attributeKvEntryOpt -> {
if (attributeKvEntryOpt != null && attributeKvEntryOpt.isPresent()) {
AttributeKvEntry attributeKvEntry = attributeKvEntryOpt.get();

View File

@ -570,7 +570,11 @@ edges:
# Enable/disable SSL support
enabled: "${EDGES_RPC_SSL_ENABLED:false}"
cert: "${EDGES_RPC_SSL_CERT:certChainFile.pem}"
privateKey: "${EDGES_RPC_SSL_PRIVATE_KEY:privateKeyFile.pem}"
private_key: "${EDGES_RPC_SSL_PRIVATE_KEY:privateKeyFile.pem}"
storage:
max_read_records_count: "${EDGES_RPC_STORAGE_MAX_READ_RECORDS_COUNT:50}"
no_read_records_sleep: "${EDGES_RPC_NO_READ_RECORDS_SLEEP:1000}"
sleep_between_batches: "${EDGES_RPC_SLEEP_BETWEEN_BATCHES:1000}"
swagger:
api_path_regex: "${SWAGGER_API_PATH_REGEX:/api.*}"

View File

@ -60,8 +60,6 @@ public interface RuleChainService {
List<EntityRelation> getRuleNodeRelations(TenantId tenantId, RuleNodeId ruleNodeId);
PageData<RuleChain> findTenantRuleChains(TenantId tenantId, PageLink pageLink);
PageData<RuleChain> findTenantRuleChainsByType(TenantId tenantId, RuleChainType type, PageLink pageLink);
void deleteRuleChainById(TenantId tenantId, RuleChainId ruleChainId);

View File

@ -352,13 +352,6 @@ public class BaseRuleChainService extends AbstractEntityService implements RuleC
return validRelations;
}
@Override
public PageData<RuleChain> findTenantRuleChains(TenantId tenantId, PageLink pageLink) {
Validator.validateId(tenantId, "Incorrect tenant id for search rule chain request.");
Validator.validatePageLink(pageLink);
return ruleChainDao.findRuleChainsByTenantId(tenantId.getId(), pageLink);
}
@Override
public PageData<RuleChain> findTenantRuleChainsByType(TenantId tenantId, RuleChainType type, PageLink pageLink) {
Validator.validateId(tenantId, "Incorrect tenant id for search rule chain request.");
@ -544,7 +537,7 @@ public class BaseRuleChainService extends AbstractEntityService implements RuleC
throw new DataValidationException("Rule chain name should be specified!");
}
if (ruleChain.getType() == null) {
throw new DataValidationException("Rule chain type should be specified!");
ruleChain.setType(RuleChainType.SYSTEM);
}
if (ruleChain.getTenantId() == null || ruleChain.getTenantId().isNullUid()) {
throw new DataValidationException("Rule chain should be assigned to tenant!");

View File

@ -66,7 +66,6 @@ public abstract class BaseRuleChainServiceTest extends AbstractServiceTest {
public void testSaveRuleChain() throws IOException {
RuleChain ruleChain = new RuleChain();
ruleChain.setTenantId(tenantId);
ruleChain.setType(RuleChainType.SYSTEM);
ruleChain.setName("My RuleChain");
RuleChain savedRuleChain = ruleChainService.saveRuleChain(ruleChain);
@ -105,7 +104,6 @@ public abstract class BaseRuleChainServiceTest extends AbstractServiceTest {
RuleChain ruleChain = new RuleChain();
ruleChain.setTenantId(tenantId);
ruleChain.setName("My RuleChain");
ruleChain.setType(RuleChainType.SYSTEM);
RuleChain savedRuleChain = ruleChainService.saveRuleChain(ruleChain);
RuleChain foundRuleChain = ruleChainService.findRuleChainById(tenantId, savedRuleChain.getId());
Assert.assertNotNull(foundRuleChain);
@ -118,7 +116,6 @@ public abstract class BaseRuleChainServiceTest extends AbstractServiceTest {
RuleChain ruleChain = new RuleChain();
ruleChain.setTenantId(tenantId);
ruleChain.setName("My RuleChain");
ruleChain.setType(RuleChainType.SYSTEM);
RuleChain savedRuleChain = ruleChainService.saveRuleChain(ruleChain);
RuleChain foundRuleChain = ruleChainService.findRuleChainById(tenantId, savedRuleChain.getId());
Assert.assertNotNull(foundRuleChain);
@ -138,7 +135,6 @@ public abstract class BaseRuleChainServiceTest extends AbstractServiceTest {
List<RuleChain> ruleChains = new ArrayList<>();
for (int i = 0; i < 165; i++) {
RuleChain ruleChain = new RuleChain();
ruleChain.setType(RuleChainType.SYSTEM);
ruleChain.setTenantId(tenantId);
ruleChain.setName("RuleChain" + i);
ruleChains.add(ruleChainService.saveRuleChain(ruleChain));
@ -148,7 +144,7 @@ public abstract class BaseRuleChainServiceTest extends AbstractServiceTest {
PageLink pageLink = new PageLink(16);
PageData<RuleChain> pageData = null;
do {
pageData = ruleChainService.findTenantRuleChains(tenantId, pageLink);
pageData = ruleChainService.findTenantRuleChainsByType(tenantId, RuleChainType.SYSTEM, pageLink);
loadedRuleChains.addAll(pageData.getData());
if (pageData.hasNext()) {
pageLink = pageLink.nextPageLink();
@ -163,7 +159,7 @@ public abstract class BaseRuleChainServiceTest extends AbstractServiceTest {
ruleChainService.deleteRuleChainsByTenantId(tenantId);
pageLink = new PageLink(31);
pageData = ruleChainService.findTenantRuleChains(tenantId, pageLink);
pageData = ruleChainService.findTenantRuleChainsByType(tenantId, RuleChainType.SYSTEM, pageLink);
Assert.assertFalse(pageData.hasNext());
Assert.assertTrue(pageData.getData().isEmpty());
@ -177,7 +173,6 @@ public abstract class BaseRuleChainServiceTest extends AbstractServiceTest {
for (int i = 0; i < 123; i++) {
RuleChain ruleChain = new RuleChain();
ruleChain.setTenantId(tenantId);
ruleChain.setType(RuleChainType.SYSTEM);
String suffix = RandomStringUtils.randomAlphanumeric((int) (Math.random() * 17));
String name = name1 + suffix;
name = i % 2 == 0 ? name.toLowerCase() : name.toUpperCase();
@ -189,7 +184,6 @@ public abstract class BaseRuleChainServiceTest extends AbstractServiceTest {
for (int i = 0; i < 193; i++) {
RuleChain ruleChain = new RuleChain();
ruleChain.setTenantId(tenantId);
ruleChain.setType(RuleChainType.SYSTEM);
String suffix = RandomStringUtils.randomAlphanumeric((int) (Math.random() * 15));
String name = name2 + suffix;
name = i % 2 == 0 ? name.toLowerCase() : name.toUpperCase();
@ -201,7 +195,7 @@ public abstract class BaseRuleChainServiceTest extends AbstractServiceTest {
PageLink pageLink = new PageLink(19, 0, name1);
PageData<RuleChain> pageData = null;
do {
pageData = ruleChainService.findTenantRuleChains(tenantId, pageLink);
pageData = ruleChainService.findTenantRuleChainsByType(tenantId, RuleChainType.SYSTEM, pageLink);
loadedRuleChainsName1.addAll(pageData.getData());
if (pageData.hasNext()) {
pageLink = pageLink.nextPageLink();
@ -216,7 +210,7 @@ public abstract class BaseRuleChainServiceTest extends AbstractServiceTest {
List<RuleChain> loadedRuleChainsName2 = new ArrayList<>();
pageLink = new PageLink(4, 0, name2);
do {
pageData = ruleChainService.findTenantRuleChains(tenantId, pageLink);
pageData = ruleChainService.findTenantRuleChainsByType(tenantId, RuleChainType.SYSTEM, pageLink);
loadedRuleChainsName2.addAll(pageData.getData());
if (pageData.hasNext()) {
pageLink = pageLink.nextPageLink();
@ -233,7 +227,7 @@ public abstract class BaseRuleChainServiceTest extends AbstractServiceTest {
}
pageLink = new PageLink(4, 0, name1);
pageData = ruleChainService.findTenantRuleChains(tenantId, pageLink);
pageData = ruleChainService.findTenantRuleChainsByType(tenantId, RuleChainType.SYSTEM, pageLink);
Assert.assertFalse(pageData.hasNext());
Assert.assertEquals(0, pageData.getData().size());
@ -242,7 +236,7 @@ public abstract class BaseRuleChainServiceTest extends AbstractServiceTest {
}
pageLink = new PageLink(4, 0, name2);
pageData = ruleChainService.findTenantRuleChains(tenantId, pageLink);
pageData = ruleChainService.findTenantRuleChainsByType(tenantId, RuleChainType.SYSTEM, pageLink);
Assert.assertFalse(pageData.hasNext());
Assert.assertEquals(0, pageData.getData().size());
}
@ -327,7 +321,6 @@ public abstract class BaseRuleChainServiceTest extends AbstractServiceTest {
private RuleChainMetaData createRuleChainMetadata() throws Exception {
RuleChain ruleChain = new RuleChain();
ruleChain.setName("My RuleChain");
ruleChain.setType(RuleChainType.SYSTEM);
ruleChain.setTenantId(tenantId);
RuleChain savedRuleChain = ruleChainService.saveRuleChain(ruleChain);