Async Insert Mode
- RFC PR: datafuselabs/databend#5567
- Tracking Issue: datafuselabs/databend#4577
Summary
Async insert mode aims to improve the throughput of high rate of small inserts.
Motivation
When thousands of clients concurrently insert a small batch of data, each insert will be executed as follows:
Parser
-> Planner
-> Interpreter
-> Pipeline
It's inefficient because of I/O depth and cache locality.
To solve the problem, we want to buffer small inserts into batches in server which sacrifices tiny latency for better insert throughput, smaller block count and larger DataBlock
in storage.
After doing this, inserts into the same table will be parsed and planned individual. The insert data will be convert to DataBlock
and buffered. When some conditions are triggered, the buffered DataBlock
will be interpreted once which also is beneficial for pipelines.
Design
Buffer Structure
In order to buffer data of different tables. We need a HashMap
to save datablocks of different insert plan. The settings of inserts also should be considered. But the settings of QueryContext
take up big space which is unnecessary. Therefore, we save the changed settings. It's the settings which are different from default values of settings.
The key of HashMap:
#[derive(Clone)]
pub struct InsertKey {
plan: Arc<InsertPlan>,
// settings different with default settings
changed_settings: Settings,
}
The plan is the first plan arrived at the bucket of hashmap. For example there comes two inserts:
insert into test values(1);
insert into test select 1;
They are same plan because they have same schema and share one table. So the datablock of these two inserts will be put the same bucket. The plan of InsertKey is the plan insert into test values(1);
which is insignificant. Because we only need the plan to construct Interpreter
.
The Value of HashMap:
pub struct InsertData {
entries: Vec<EntryPtr>,
data_size: u64,
first_update: Instant,
last_update: Instant,
}
The entries
is array of Entry
:
pub struct Entry {
block: DataBlock,
finished: Arc<RwLock<bool>>,
notify: Arc<Notify>,
}
Each insert corresponds to each block
. The finished
and notify
is for query response. When server completed a batch of inserts, the entry will be finished and notified.
The data_size
is the memory size of entries
. The first_update
is the timestamp of the first insert into the bucket. The last_update
is the timestamp of the last insert into the bucket. They all are used to trigger the batch inserts.
Execution
Let's look at the AsyncInsertQueue
:
type Queue = HashMap<InsertKey, InsertData>;
type QueryIdToEntry = HashMap<String, EntryPtr>;
pub struct AsyncInsertQueue {
pub session_mgr: Arc<RwLock<Option<Arc<SessionManager>>>>,
runtime: Arc<Runtime>,
max_data_size: u64,
busy_timeout: Duration,
stale_timeout: Duration,
queue: Arc<RwLock<Queue>>,
current_processing_insert: Arc<RwLock<QueryIdToEntry>>,
}
impl AsyncInsertQueue {
pub fn try_create() -> Self {}
pub async fn start(self: Arc<Self>) {}
pub async fn push(self: Arc<Self>, plan_node: Arc<InsertPlan>, ctx: Arc<QueryContext>) -> Result<()> {}
pub async fn wait_for_processing_insert(self: Arc<Self>, query_id: String, time_out: Duration,) -> Result<()> {}
fn schedule(self: Arc<Self>, key: InsertKey, data: InsertData) {}
async fn process(self: Arc<Self>, key: InsertKey, data: InsertData) {}
fn busy_check(self: Arc<Self>) -> Duration {}
fn stale_check(self: Arc<Self>) {}
}
session_mgr
: needed because ofAsyncInsertQueue
needs aQueryContext
when it process a batch of data.runtime
: a global storage runtime to execute the insert task.max_data_size
: the maximum memory size of the buffered data collected per insert before being inserted.busy_timeout
: the maximum timeout in milliseconds since the first insert before inserting collected data.stale_timeout
: the maximum timeout in milliseconds since the last insert before inserting collected data.queue
: the buffer to cache batch inserts which is aHashMap
because we need to distinguish different inserts.current_processing_insert
: inserts which are processing and clients wait for the processing result. We useQueryId
as the key to distinguish different clients.
When databend-query
starts, AsyncInsertQueue
starts according to a config enable_async_insert
. If enable_async_insert
is true, the queue will call start
method. Every insert will call push
method with InsertPlan
and QueryContext
as inputs and call wait_for_processing_insert
to waiting. The queue receives many inserts and two ticker task busy_check
and stale_check
runs in the background. When specific condition is triggered, the queue will call schedule
to schedule the task of batch inserts and the data will be process
ed by the runtime executor.
Some Details
AsyncInsertQueue
is a global instance, so we better put it in the SessionManager
. But the queue also needs SessionManager
which is a circular dependency. Therefore, the session_mgr
of AsyncInsertQueue
must be Arc<RwLock<Option<Arc<SessionManager>>>>
, not Arc<SessionManager>
. We need to modify the AsyncInsertQueue
after SessionManager
has been initialized. Maybe we should use Weak
better here.
The method arguments of AsyncInsertQueue
is Arc<Self>
because tokio runtime needs a static lifetime of task.
Configs
enable_async_insert
: Enable the async insert mode int http protocolasync_insert_max_data_size
: The maximum memory size of the buffered data collected per insert before being inserted.async_insert_busy_timeout
: The maximum timeout in milliseconds since the first insert before inserting collected data.async_insert_stale_timeout
: The maximum timeout in milliseconds since the last insert before inserting collected data.wait_for_async_insert
: Enable waiting for processing of async insert.wait_for_async_insert_timeout
: The timeout in seconds for waiting for processing of async insert.
Unresolved questions
None
Future possibilities
None