added SequentialByOriginator queue to coap, http, mqtt and refactored

This commit is contained in:
YevhenBondarenko 2020-05-14 19:00:53 +03:00 committed by Andrew Shvayka
parent f5b4ebbd1b
commit c2596b8ecb
5 changed files with 65 additions and 20 deletions

View File

@ -48,7 +48,7 @@ public class TbSynchronizationBeginNode implements TbNode {
@Override
public void onMsg(TbContext ctx, TbMsg msg) {
log.warn("Synchronization Start/End nodes are deprecated since TB 2.5. Use queue with submit strategy SEQUENTIAL_WITHIN_ORIGINATOR instead.");
log.warn("Synchronization Start/End nodes are deprecated since TB 2.5. Use queue with submit strategy SEQUENTIAL_BY_ORIGINATOR instead.");
ctx.tellSuccess(msg);
}

View File

@ -49,7 +49,7 @@ public class TbSynchronizationEndNode implements TbNode {
@Override
public void onMsg(TbContext ctx, TbMsg msg) {
log.warn("Synchronization Start/End nodes are deprecated since TB 2.5. Use queue with submit strategy SEQUENTIAL_WITHIN_ORIGINATOR instead.");
log.warn("Synchronization Start/End nodes are deprecated since TB 2.5. Use queue with submit strategy SEQUENTIAL_BY_ORIGINATOR instead.");
ctx.tellSuccess(msg);
}

View File

@ -155,19 +155,19 @@ queue:
pack-processing-timeout: "${TB_QUEUE_RULE_ENGINE_PACK_PROCESSING_TIMEOUT_MS:60000}"
stats:
enabled: "${TB_QUEUE_RULE_ENGINE_STATS_ENABLED:true}"
print-interval-ms: "${TB_QUEUE_RULE_ENGINE_STATS_PRINT_INTERVAL_MS:10000}"
print-interval-ms: "${TB_QUEUE_RULE_ENGINE_STATS_PRINT_INTERVAL_MS:60000}"
queues:
- name: "Main"
- name: "${TB_QUEUE_RE_MAIN_QUEUE_NAME:Main}"
topic: "${TB_QUEUE_RE_MAIN_TOPIC:tb_rule_engine.main}"
poll-interval: "${TB_QUEUE_RE_MAIN_POLL_INTERVAL_MS:25}"
partitions: "${TB_QUEUE_RE_MAIN_PARTITIONS:10}"
pack-processing-timeout: "${TB_QUEUE_RE_MAIN_PACK_PROCESSING_TIMEOUT_MS:60000}"
submit-strategy:
type: "${TB_QUEUE_RE_MAIN_SUBMIT_STRATEGY_TYPE:BURST}" # BURST, BATCH, SEQUENTIAL_WITHIN_ORIGINATOR, SEQUENTIAL_WITHIN_TENANT, SEQUENTIAL
type: "${TB_QUEUE_RE_MAIN_SUBMIT_STRATEGY_TYPE:BURST}" # BURST, BATCH, SEQUENTIAL_BY_ORIGINATOR, SEQUENTIAL_BY_TENANT, SEQUENTIAL
# For BATCH only
batch-size: "${TB_QUEUE_RE_MAIN_SUBMIT_STRATEGY_BATCH_SIZE:1000}" # Maximum number of messages in batch
processing-strategy:
type: "${TB_QUEUE_RE_MAIN_PROCESSING_STRATEGY_TYPE:RETRY_FAILED_AND_TIMED_OUT}" # SKIP_ALL_FAILURES, RETRY_ALL, RETRY_FAILED, RETRY_TIMED_OUT, RETRY_FAILED_AND_TIMED_OUT
type: "${TB_QUEUE_RE_MAIN_PROCESSING_STRATEGY_TYPE:SKIP_ALL_FAILURES}" # SKIP_ALL_FAILURES, RETRY_ALL, RETRY_FAILED, RETRY_TIMED_OUT, RETRY_FAILED_AND_TIMED_OUT
# For RETRY_ALL, RETRY_FAILED, RETRY_TIMED_OUT, RETRY_FAILED_AND_TIMED_OUT
retries: "${TB_QUEUE_RE_MAIN_PROCESSING_STRATEGY_RETRIES:3}" # Number of retries, 0 is unlimited
failure-percentage: "${TB_QUEUE_RE_MAIN_PROCESSING_STRATEGY_FAILURE_PERCENTAGE:0}" # Skip retry if failures or timeouts are less then X percentage of messages;
@ -175,10 +175,10 @@ queue:
- name: "${TB_QUEUE_RE_HP_QUEUE_NAME:HighPriority}"
topic: "${TB_QUEUE_RE_HP_TOPIC:tb_rule_engine.hp}"
poll-interval: "${TB_QUEUE_RE_HP_POLL_INTERVAL_MS:25}"
partitions: "${TB_QUEUE_RE_HP_PARTITIONS:3}"
partitions: "${TB_QUEUE_RE_HP_PARTITIONS:10}"
pack-processing-timeout: "${TB_QUEUE_RE_HP_PACK_PROCESSING_TIMEOUT_MS:60000}"
submit-strategy:
type: "${TB_QUEUE_RE_HP_SUBMIT_STRATEGY_TYPE:SEQUENTIAL_WITHIN_ORIGINATOR}" # BURST, BATCH, SEQUENTIAL_WITHIN_ORIGINATOR, SEQUENTIAL_WITHIN_TENANT, SEQUENTIAL
type: "${TB_QUEUE_RE_HP_SUBMIT_STRATEGY_TYPE:BURST}" # BURST, BATCH, SEQUENTIAL_BY_ORIGINATOR, SEQUENTIAL_BY_TENANT, SEQUENTIAL
# For BATCH only
batch-size: "${TB_QUEUE_RE_HP_SUBMIT_STRATEGY_BATCH_SIZE:100}" # Maximum number of messages in batch
processing-strategy:
@ -187,6 +187,21 @@ queue:
retries: "${TB_QUEUE_RE_HP_PROCESSING_STRATEGY_RETRIES:0}" # Number of retries, 0 is unlimited
failure-percentage: "${TB_QUEUE_RE_HP_PROCESSING_STRATEGY_FAILURE_PERCENTAGE:0}" # Skip retry if failures or timeouts are less then X percentage of messages;
pause-between-retries: "${TB_QUEUE_RE_HP_PROCESSING_STRATEGY_RETRY_PAUSE:5}"# Time in seconds to wait in consumer thread before retries;
- name: "${TB_QUEUE_RE_SQ_QUEUE_NAME:SequentialByOriginator}"
topic: "${TB_QUEUE_RE_SQ_TOPIC:tb_rule_engine.sq}"
poll-interval: "${TB_QUEUE_RE_SQ_POLL_INTERVAL_MS:25}"
partitions: "${TB_QUEUE_RE_SQ_PARTITIONS:10}"
pack-processing-timeout: "${TB_QUEUE_RE_SQ_PACK_PROCESSING_TIMEOUT_MS:60000}"
submit-strategy:
type: "${TB_QUEUE_RE_SQ_SUBMIT_STRATEGY_TYPE:SEQUENTIAL_BY_ORIGINATOR}" # BURST, BATCH, SEQUENTIAL_BY_ORIGINATOR, SEQUENTIAL_BY_TENANT, SEQUENTIAL
# For BATCH only
batch-size: "${TB_QUEUE_RE_SQ_SUBMIT_STRATEGY_BATCH_SIZE:100}" # Maximum number of messages in batch
processing-strategy:
type: "${TB_QUEUE_RE_SQ_PROCESSING_STRATEGY_TYPE:RETRY_FAILED_AND_TIMED_OUT}" # SKIP_ALL_FAILURES, RETRY_ALL, RETRY_FAILED, RETRY_TIMED_OUT, RETRY_FAILED_AND_TIMED_OUT
# For RETRY_ALL, RETRY_FAILED, RETRY_TIMED_OUT, RETRY_FAILED_AND_TIMED_OUT
retries: "${TB_QUEUE_RE_SQ_PROCESSING_STRATEGY_RETRIES:3}" # Number of retries, 0 is unlimited
failure-percentage: "${TB_QUEUE_RE_SQ_PROCESSING_STRATEGY_FAILURE_PERCENTAGE:0}" # Skip retry if failures or timeouts are less then X percentage of messages;
pause-between-retries: "${TB_QUEUE_RE_SQ_PROCESSING_STRATEGY_RETRY_PAUSE:5}"# Time in seconds to wait in consumer thread before retries;
transport:
# For high priority notifications that require minimum latency and processing time
notifications_topic: "${TB_QUEUE_TRANSPORT_NOTIFICATIONS_TOPIC:tb_transport.notifications}"

View File

@ -156,19 +156,19 @@ queue:
pack-processing-timeout: "${TB_QUEUE_RULE_ENGINE_PACK_PROCESSING_TIMEOUT_MS:60000}"
stats:
enabled: "${TB_QUEUE_RULE_ENGINE_STATS_ENABLED:true}"
print-interval-ms: "${TB_QUEUE_RULE_ENGINE_STATS_PRINT_INTERVAL_MS:10000}"
print-interval-ms: "${TB_QUEUE_RULE_ENGINE_STATS_PRINT_INTERVAL_MS:60000}"
queues:
- name: "Main"
- name: "${TB_QUEUE_RE_MAIN_QUEUE_NAME:Main}"
topic: "${TB_QUEUE_RE_MAIN_TOPIC:tb_rule_engine.main}"
poll-interval: "${TB_QUEUE_RE_MAIN_POLL_INTERVAL_MS:25}"
partitions: "${TB_QUEUE_RE_MAIN_PARTITIONS:10}"
pack-processing-timeout: "${TB_QUEUE_RE_MAIN_PACK_PROCESSING_TIMEOUT_MS:60000}"
submit-strategy:
type: "${TB_QUEUE_RE_MAIN_SUBMIT_STRATEGY_TYPE:BURST}" # BURST, BATCH, SEQUENTIAL_WITHIN_ORIGINATOR, SEQUENTIAL_WITHIN_TENANT, SEQUENTIAL
type: "${TB_QUEUE_RE_MAIN_SUBMIT_STRATEGY_TYPE:BURST}" # BURST, BATCH, SEQUENTIAL_BY_ORIGINATOR, SEQUENTIAL_BY_TENANT, SEQUENTIAL
# For BATCH only
batch-size: "${TB_QUEUE_RE_MAIN_SUBMIT_STRATEGY_BATCH_SIZE:1000}" # Maximum number of messages in batch
processing-strategy:
type: "${TB_QUEUE_RE_MAIN_PROCESSING_STRATEGY_TYPE:RETRY_FAILED_AND_TIMED_OUT}" # SKIP_ALL_FAILURES, RETRY_ALL, RETRY_FAILED, RETRY_TIMED_OUT, RETRY_FAILED_AND_TIMED_OUT
type: "${TB_QUEUE_RE_MAIN_PROCESSING_STRATEGY_TYPE:SKIP_ALL_FAILURES}" # SKIP_ALL_FAILURES, RETRY_ALL, RETRY_FAILED, RETRY_TIMED_OUT, RETRY_FAILED_AND_TIMED_OUT
# For RETRY_ALL, RETRY_FAILED, RETRY_TIMED_OUT, RETRY_FAILED_AND_TIMED_OUT
retries: "${TB_QUEUE_RE_MAIN_PROCESSING_STRATEGY_RETRIES:3}" # Number of retries, 0 is unlimited
failure-percentage: "${TB_QUEUE_RE_MAIN_PROCESSING_STRATEGY_FAILURE_PERCENTAGE:0}" # Skip retry if failures or timeouts are less then X percentage of messages;
@ -176,10 +176,10 @@ queue:
- name: "${TB_QUEUE_RE_HP_QUEUE_NAME:HighPriority}"
topic: "${TB_QUEUE_RE_HP_TOPIC:tb_rule_engine.hp}"
poll-interval: "${TB_QUEUE_RE_HP_POLL_INTERVAL_MS:25}"
partitions: "${TB_QUEUE_RE_HP_PARTITIONS:3}"
partitions: "${TB_QUEUE_RE_HP_PARTITIONS:10}"
pack-processing-timeout: "${TB_QUEUE_RE_HP_PACK_PROCESSING_TIMEOUT_MS:60000}"
submit-strategy:
type: "${TB_QUEUE_RE_HP_SUBMIT_STRATEGY_TYPE:SEQUENTIAL_WITHIN_ORIGINATOR}" # BURST, BATCH, SEQUENTIAL_WITHIN_ORIGINATOR, SEQUENTIAL_WITHIN_TENANT, SEQUENTIAL
type: "${TB_QUEUE_RE_HP_SUBMIT_STRATEGY_TYPE:BURST}" # BURST, BATCH, SEQUENTIAL_BY_ORIGINATOR, SEQUENTIAL_BY_TENANT, SEQUENTIAL
# For BATCH only
batch-size: "${TB_QUEUE_RE_HP_SUBMIT_STRATEGY_BATCH_SIZE:100}" # Maximum number of messages in batch
processing-strategy:
@ -188,6 +188,21 @@ queue:
retries: "${TB_QUEUE_RE_HP_PROCESSING_STRATEGY_RETRIES:0}" # Number of retries, 0 is unlimited
failure-percentage: "${TB_QUEUE_RE_HP_PROCESSING_STRATEGY_FAILURE_PERCENTAGE:0}" # Skip retry if failures or timeouts are less then X percentage of messages;
pause-between-retries: "${TB_QUEUE_RE_HP_PROCESSING_STRATEGY_RETRY_PAUSE:5}"# Time in seconds to wait in consumer thread before retries;
- name: "${TB_QUEUE_RE_SQ_QUEUE_NAME:SequentialByOriginator}"
topic: "${TB_QUEUE_RE_SQ_TOPIC:tb_rule_engine.sq}"
poll-interval: "${TB_QUEUE_RE_SQ_POLL_INTERVAL_MS:25}"
partitions: "${TB_QUEUE_RE_SQ_PARTITIONS:10}"
pack-processing-timeout: "${TB_QUEUE_RE_SQ_PACK_PROCESSING_TIMEOUT_MS:60000}"
submit-strategy:
type: "${TB_QUEUE_RE_SQ_SUBMIT_STRATEGY_TYPE:SEQUENTIAL_BY_ORIGINATOR}" # BURST, BATCH, SEQUENTIAL_BY_ORIGINATOR, SEQUENTIAL_BY_TENANT, SEQUENTIAL
# For BATCH only
batch-size: "${TB_QUEUE_RE_SQ_SUBMIT_STRATEGY_BATCH_SIZE:100}" # Maximum number of messages in batch
processing-strategy:
type: "${TB_QUEUE_RE_SQ_PROCESSING_STRATEGY_TYPE:RETRY_FAILED_AND_TIMED_OUT}" # SKIP_ALL_FAILURES, RETRY_ALL, RETRY_FAILED, RETRY_TIMED_OUT, RETRY_FAILED_AND_TIMED_OUT
# For RETRY_ALL, RETRY_FAILED, RETRY_TIMED_OUT, RETRY_FAILED_AND_TIMED_OUT
retries: "${TB_QUEUE_RE_SQ_PROCESSING_STRATEGY_RETRIES:3}" # Number of retries, 0 is unlimited
failure-percentage: "${TB_QUEUE_RE_SQ_PROCESSING_STRATEGY_FAILURE_PERCENTAGE:0}" # Skip retry if failures or timeouts are less then X percentage of messages;
pause-between-retries: "${TB_QUEUE_RE_SQ_PROCESSING_STRATEGY_RETRY_PAUSE:5}"# Time in seconds to wait in consumer thread before retries;
transport:
# For high priority notifications that require minimum latency and processing time
notifications_topic: "${TB_QUEUE_TRANSPORT_NOTIFICATIONS_TOPIC:tb_transport.notifications}"

View File

@ -176,19 +176,19 @@ queue:
pack-processing-timeout: "${TB_QUEUE_RULE_ENGINE_PACK_PROCESSING_TIMEOUT_MS:60000}"
stats:
enabled: "${TB_QUEUE_RULE_ENGINE_STATS_ENABLED:true}"
print-interval-ms: "${TB_QUEUE_RULE_ENGINE_STATS_PRINT_INTERVAL_MS:10000}"
print-interval-ms: "${TB_QUEUE_RULE_ENGINE_STATS_PRINT_INTERVAL_MS:60000}"
queues:
- name: "Main"
- name: "${TB_QUEUE_RE_MAIN_QUEUE_NAME:Main}"
topic: "${TB_QUEUE_RE_MAIN_TOPIC:tb_rule_engine.main}"
poll-interval: "${TB_QUEUE_RE_MAIN_POLL_INTERVAL_MS:25}"
partitions: "${TB_QUEUE_RE_MAIN_PARTITIONS:10}"
pack-processing-timeout: "${TB_QUEUE_RE_MAIN_PACK_PROCESSING_TIMEOUT_MS:60000}"
submit-strategy:
type: "${TB_QUEUE_RE_MAIN_SUBMIT_STRATEGY_TYPE:BURST}" # BURST, BATCH, SEQUENTIAL_WITHIN_ORIGINATOR, SEQUENTIAL_WITHIN_TENANT, SEQUENTIAL
type: "${TB_QUEUE_RE_MAIN_SUBMIT_STRATEGY_TYPE:BURST}" # BURST, BATCH, SEQUENTIAL_BY_ORIGINATOR, SEQUENTIAL_BY_TENANT, SEQUENTIAL
# For BATCH only
batch-size: "${TB_QUEUE_RE_MAIN_SUBMIT_STRATEGY_BATCH_SIZE:1000}" # Maximum number of messages in batch
processing-strategy:
type: "${TB_QUEUE_RE_MAIN_PROCESSING_STRATEGY_TYPE:RETRY_FAILED_AND_TIMED_OUT}" # SKIP_ALL_FAILURES, RETRY_ALL, RETRY_FAILED, RETRY_TIMED_OUT, RETRY_FAILED_AND_TIMED_OUT
type: "${TB_QUEUE_RE_MAIN_PROCESSING_STRATEGY_TYPE:SKIP_ALL_FAILURES}" # SKIP_ALL_FAILURES, RETRY_ALL, RETRY_FAILED, RETRY_TIMED_OUT, RETRY_FAILED_AND_TIMED_OUT
# For RETRY_ALL, RETRY_FAILED, RETRY_TIMED_OUT, RETRY_FAILED_AND_TIMED_OUT
retries: "${TB_QUEUE_RE_MAIN_PROCESSING_STRATEGY_RETRIES:3}" # Number of retries, 0 is unlimited
failure-percentage: "${TB_QUEUE_RE_MAIN_PROCESSING_STRATEGY_FAILURE_PERCENTAGE:0}" # Skip retry if failures or timeouts are less then X percentage of messages;
@ -196,10 +196,10 @@ queue:
- name: "${TB_QUEUE_RE_HP_QUEUE_NAME:HighPriority}"
topic: "${TB_QUEUE_RE_HP_TOPIC:tb_rule_engine.hp}"
poll-interval: "${TB_QUEUE_RE_HP_POLL_INTERVAL_MS:25}"
partitions: "${TB_QUEUE_RE_HP_PARTITIONS:3}"
partitions: "${TB_QUEUE_RE_HP_PARTITIONS:10}"
pack-processing-timeout: "${TB_QUEUE_RE_HP_PACK_PROCESSING_TIMEOUT_MS:60000}"
submit-strategy:
type: "${TB_QUEUE_RE_HP_SUBMIT_STRATEGY_TYPE:SEQUENTIAL_WITHIN_ORIGINATOR}" # BURST, BATCH, SEQUENTIAL_WITHIN_ORIGINATOR, SEQUENTIAL_WITHIN_TENANT, SEQUENTIAL
type: "${TB_QUEUE_RE_HP_SUBMIT_STRATEGY_TYPE:BURST}" # BURST, BATCH, SEQUENTIAL_BY_ORIGINATOR, SEQUENTIAL_BY_TENANT, SEQUENTIAL
# For BATCH only
batch-size: "${TB_QUEUE_RE_HP_SUBMIT_STRATEGY_BATCH_SIZE:100}" # Maximum number of messages in batch
processing-strategy:
@ -208,6 +208,21 @@ queue:
retries: "${TB_QUEUE_RE_HP_PROCESSING_STRATEGY_RETRIES:0}" # Number of retries, 0 is unlimited
failure-percentage: "${TB_QUEUE_RE_HP_PROCESSING_STRATEGY_FAILURE_PERCENTAGE:0}" # Skip retry if failures or timeouts are less then X percentage of messages;
pause-between-retries: "${TB_QUEUE_RE_HP_PROCESSING_STRATEGY_RETRY_PAUSE:5}"# Time in seconds to wait in consumer thread before retries;
- name: "${TB_QUEUE_RE_SQ_QUEUE_NAME:SequentialByOriginator}"
topic: "${TB_QUEUE_RE_SQ_TOPIC:tb_rule_engine.sq}"
poll-interval: "${TB_QUEUE_RE_SQ_POLL_INTERVAL_MS:25}"
partitions: "${TB_QUEUE_RE_SQ_PARTITIONS:10}"
pack-processing-timeout: "${TB_QUEUE_RE_SQ_PACK_PROCESSING_TIMEOUT_MS:60000}"
submit-strategy:
type: "${TB_QUEUE_RE_SQ_SUBMIT_STRATEGY_TYPE:SEQUENTIAL_BY_ORIGINATOR}" # BURST, BATCH, SEQUENTIAL_BY_ORIGINATOR, SEQUENTIAL_BY_TENANT, SEQUENTIAL
# For BATCH only
batch-size: "${TB_QUEUE_RE_SQ_SUBMIT_STRATEGY_BATCH_SIZE:100}" # Maximum number of messages in batch
processing-strategy:
type: "${TB_QUEUE_RE_SQ_PROCESSING_STRATEGY_TYPE:RETRY_FAILED_AND_TIMED_OUT}" # SKIP_ALL_FAILURES, RETRY_ALL, RETRY_FAILED, RETRY_TIMED_OUT, RETRY_FAILED_AND_TIMED_OUT
# For RETRY_ALL, RETRY_FAILED, RETRY_TIMED_OUT, RETRY_FAILED_AND_TIMED_OUT
retries: "${TB_QUEUE_RE_SQ_PROCESSING_STRATEGY_RETRIES:3}" # Number of retries, 0 is unlimited
failure-percentage: "${TB_QUEUE_RE_SQ_PROCESSING_STRATEGY_FAILURE_PERCENTAGE:0}" # Skip retry if failures or timeouts are less then X percentage of messages;
pause-between-retries: "${TB_QUEUE_RE_SQ_PROCESSING_STRATEGY_RETRY_PAUSE:5}"# Time in seconds to wait in consumer thread before retries;
transport:
# For high priority notifications that require minimum latency and processing time
notifications_topic: "${TB_QUEUE_TRANSPORT_NOTIFICATIONS_TOPIC:tb_transport.notifications}"