1. Background
This article mainly shares ideas on parameter tuning and problem troubleshooting based on the official documentation for continuous import from Apache Flink® using @ Flink-connector-starrocks .
2 Flink-connector-starrocks Import Logic
As is well known, the stream load encapsulated at the bottom of Flink-connector-starrocks, when importing data through flink-connector-starrocks, actually imports data through the HTTP protocol at the bottom. So after everyone understands this, it will be easier to locate and solve the problems encountered.
Currently, there are two strategies for Flink-connector-starrocks to trigger writes to StarRocks:
Flink-connector-starrocks 1.2.3 and earlier versions
- When sink.semanti="at-least-once" is enabled, this is the current default strategy
When checkpoint is not enabled, writing to StarRocks will be triggered according to the following parameters, whichever meets the condition first.
sink.buffer-flush.max-bytes=94371840
sink.buffer-flush.max-rows=500000
sink.buffer-flush.interval-ms=300000
//If checkpointing is enabled, the execution.checkpointing.interval time also needs to be considered.
How to check which condition actually triggered the write to StarRocks? You can check it through the Flink taskmanager logs. For example, the following is a write triggered by "sink.buffer-flush.interval-ms"
2022-12-08 10:34:03,670 INFO com.starrocks.connector.flink.manager.StarRocksSinkManager [] - StarRocks interval Sinking triggered.
2022-12-08 10:34:03,671 INFO com.starrocks.connector.flink.manager.StarRocksSinkManager [] - Async stream load: db[db] table[table] rows[30967] bytes[28062223] label[627b476b-de30-4aec-bfdf-f46d88782da7].
2022-12-08 10:34:03,671 INFO com.starrocks.connector.flink.manager.StarRocksStreamLoadVisitor [] - Start to join batch data: label[627b476b-de30-4aec-bfdf-f46d88782da7].
2022-12-08 10:34:03,682 INFO com.starrocks.connector.flink.manager.StarRocksStreamLoadVisitor [] - Executing stream load to: 'http://10.0.0.2:8030/api/db/table/_stream_load', size: '28093190', thread: 14342
Currently, the logs can distinguish which condition triggers the write based on the following keywords
Conditions for triggering a write |
taskmanager log keywords |
sink.buffer-flush.max-bytes sink.buffer-flush.max-rows |
StarRocks buffer Sinking triggered: db: [%s] table: [%s] rows[%d] label[%s] |
|
sink.buffer-flush.interval-ms
execution.checkpointing.interval
|
StarRocks interval Sinking triggered
can determine whether it is caused by checkpoint triggering or sink.buffer-flush.interval-ms triggering based on the interval of the sink
|
- 开启exactly once(sink.semanti="exactly-once")
After enabling exactly once, data will be written to StarRocks according to the checkpoint cycle
execution.checkpointing.interval: 300000
Flink-connector-starrocks 1.2.4 and later versions
Starting from version 1.2.4 of Flink-connector-starrocks, the sink.buffer-flush.max-rows strategy has been removed
- When sink.semanti="at-least-once" is enabled, this is the current default strategy
When checkpoint is not enabled, writing to StarRocks will be triggered according to the following parameters, whichever meets the condition first.
sink.buffer-flush.max-bytes=94371840
sink.buffer-flush.interval-ms=300000
//If checkpointing is enabled, the execution.checkpointing.interval time also needs to be considered.
How can I check which condition actually triggered the write to StarRocks? This can be checked through the Flink taskmanager logs. Currently, the logs can distinguish which condition triggered the write based on the following keywords
Conditions for triggering a write |
taskmanager log keywords |
sink.buffer-flush.max-bytes |
Cache full, wait flush, currentBytes |
execution.checkpointing.interval |
Stream load manager flush |
-
开启exactly once(sink.semanti="exactly-once")
- For SR >=2.4.0, exactly-once is implemented by default based on StarRocks' transaction stream load, where data between two checkpoints will be committed as a single transaction, but the data will be split according to the buffer configuration and sent multiple times
- For SR < 2.4, exactly-once is implemented based on ordinary stream load, and the data between two checkpoints is also committed as a transaction, but it is only written once according to the checkpoint cycle
3. Supported imported data formats
- CSV(Default)
'sink.properties.column_separator' = '\\x01',
'sink.properties.row_delimiter' = '\\x02'
- JSON
'sink.properties.format' = 'json',
'sink.properties.strip_outer_array' = 'true'
Four Parameter Configuration
Only commonly used configurations are listed here; for others, please refer to the official documentation
Configuration Item |
Default Value |
Type |
Description |
|
sink.semantic
|
at-least-once |
String |
at-least-once 或者 exactly-once |
|
sink.buffer-flush.max-bytes
|
94371840(90M) |
String
|
The maximum data size for a single flush, after reaching the set value, will trigger a write to Starrocks. For details on the specific write operation, refer to the second section "Flink-connector-starrocks Import Logic".
Flink-connector-starrocks 1.2.3 and earlier versions, value range: [64MB, 1.9GB],
Flink-connector version 1.2.4 and later versions, value range: [64MB, 10GB].
|
sink.buffer-flush.max-rows |
500000 |
String |
The maximum number of data rows per flush, which will trigger a write to Starrocks after reaching the set value. For details on the specific write operation, refer to the second section "Flink-connector-starrocks Import Logic".
Value range: [64,000, 5000,000]
|
sink.buffer-flush.interval-ms |
300000 |
String |
Write data interval: writing to Starrocks will be triggered after the set value is met. For details on the specific writing process, refer to Part II "Flink-connector-starrocks Import Logic".
Value range: [1000ms, 3600000ms].
|
sink.properties.* |
NONE |
String
|
Specify imported parameters, where the parameters specified here are related to stream load
For example, in Stream Load, specify -H "columns: k1, k2, k3" in the header. In flink-connector-starrocks, you need to specify 'sink.properties.columns' = 'k1, k2, k3'. For other parameters, please refer to STREAM LOAD .
Starting from StarRocks version 2.4, flink-connector-starrocks supports partial column updates for the primary key model, which can be achieved by setting 'sink.properties.partial_update'='true'
|
sink.properties.ignore_json_size |
false |
String
|
Starting from StarRocks version 2.1, it supports ignoring thejson100MB data volume limit per import, but this will put pressure on memory. If enabled, please monitor the memory load |
5 Best Practices
1. How to reasonably configure imported parameters and parallelism to achieve relatively high import efficiency?
Based on the analysis of user cases we have encountered so far, the main reasons for the low writing efficiency of flink-connector-starrocks are as follows:
-
StarRocks every import is a transaction , import frequency is high, compaction and transaction interaction pressure
- Flink sink has high parallelism
- Small amount of data imported per time (small batch size)
The above issues are mainly optimized for import based on batch accumulation
- When at leaset once is enabled, try to increase the amount of data imported per time, reduce concurrency and frequency
Flink-connector-starrocks 1.2.3 and earlier versions
Search for the following keywords in the taskmanager to check which triggering logic is used and how long the interval is between each flush trigger (if there are multiple parallelisms, it is necessary to check how long the interval is for the corresponding thread number to trigger)
Conditions for triggering a write |
taskmanager log keywords |
sink.buffer-flush.max-bytes sink.buffer-flush.max-rows |
StarRocks buffer Sinking triggered: db: [%s] table: [%s] rows[%d] label[%s] |
|
sink.buffer-flush.interval-ms
execution.checkpointing.interval
|
StarRocks interval Sinking triggered
can determine whether it is caused by checkpoint triggering or sink.buffer-flush.interval-ms triggering based on the interval of the sink
|
Since StarRocks writing is triggered when any one of the following parameters meets the condition, it is recommended to adjust them in conjunction, generally suggesting a minimum of once every 10 seconds for writing
sink.buffer-flush.max-bytes
sink.buffer-flush.max-rows
sink.buffer-flush.interval-ms
//If checkpointing is enabled, the execution.checkpointing.interval time also needs to be considered.
Flink-connector-starrocks 1.2.4 and later versions
Search for the following keywords in the taskmanager to check which triggering logic is used and how long the interval is between each flush trigger (if there are multiple parallelisms, it is necessary to check how long the interval is for the corresponding thread number to trigger)
Conditions for triggering a write |
taskmanager log keywords |
sink.buffer-flush.max-bytes |
Cache full, wait flush, currentBytes |
execution.checkpointing.interval |
Stream load manager flush |
sink.buffer-flush.interval-ms |
Currently there are no keywords, you can search for _stream_load in the log |
Since StarRocks writing is triggered when any one of the following parameters meets the condition, it is recommended to adjust them in conjunction, generally suggesting a minimum of once every 10 seconds for writing
sink.buffer-flush.max-bytes
sink.buffer-flush.interval-ms
//If checkpointing is enabled, the execution.checkpointing.interval time also needs to be considered.
- When enabling exactly once, try to increase the checkpoint interval as much as possible
- Parallelism of Flink sink: The maximum should be controlled within 8 * number of BEs; if the data volume is relatively small, 1 is sufficient
2. If the import fails, how can I troubleshoot it?
- Confirm whether the port is correct,
- Check the flink taskmanager log to confirm the error messages
- Whether the stream load task was initiated normally
3. How to write code to implement upsert and delete operations for the primary key model?
Check the following yellow-highlighted section, Note: A demo is provided here; in actual production, it is recommended to refer to Best Practice 1 for reasonable batch writing to StarRocks
package com.starrocks.flink;
...
public class Bean {
public static void main(String[] args) throws Exception {
class RowData {
public int userId;
public String name;
public String email;
public String address;
public String opType;
public RowData(int userId, String name, String email, String address, String Optype) {
...
}
}
final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<RowData> rowDataDataStreamSource = env.fromElements(
new RowData[]{
new RowData(99, "stephen", "stephen@example.com", "N.35", "I"),
...
}
);
DataStreamSink<RowData> rowDataDataStreamSink = rowDataDataStreamSource.addSink(
StarRocksSink.sink(
// the table structure
TableSchema.builder()
.field("user_id", DataTypes.INT().notNull())
...
.primaryKey("user_id")
.build(),
// the sink options
StarRocksSinkOptions.builder()
.withProperty("jdbc-url", "jdbc:mysql://127.0.0.1:9030")
...
// set the slots with streamRowData
(slots, streamRowData) -> {
slots[0] = streamRowData.getUserId();
slots[1] = streamRowData.getName();
slots[2] = streamRowData.getEmail();
slots[3] = streamRowData.getAddress();
slots[slots.length - 1] = StarRocksSinkOP.UPSERT.ordinal(); // The primary key model write operation requires specifying whether it's an upsert or delete. Here, we use upsert as an example.
// If the raw data contains a field indicating whether it's an upsert or delete, you can make the judgment here. For example, if there is a field named "type" in the raw data with values such as "insert," "update," or "delete,"
// then you can add a conditional check here.
if (streamRowData.getType() == "D") {
slots[slots.length - 1] = StarRocksSinkOP.DELETE.ordinal();
} else {
slots[slots.length - 1] = StarRocksSinkOP.UPSERT.ordinal();
}
}
)
);
env.execute();
}
}
See the specific code in
package com.starrocks.flink;
import com.starrocks.connector.flink.StarRocksSink;
import com.starrocks.connector.flink.row.sink.StarRocksSinkOP;
import com.starrocks.connector.flink.table.StarRocksDataType;
import com.starrocks.connector.flink.table.sink.StarRocksSinkOptions;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.TableSchema;
public class Bean {
public static void main(String[] args) throws Exception {
class RowData {
public int userId;
public String name;
public String email;
public String address;
public String opType;
public RowData(int userId, String name, String email, String address, String opType) {
this.userId = userId;
this.name = name;
this.email = email;
this.address = address;
this.opType = opType;
}
public Object getUserId() {
return this.userId;
}
public Object getName() {
return this.name;
}
public Object getEmail() { return this.email; }
public Object getAddress() { return this.address; }
public Object getType() { return this.opType; }
}
final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<RowData> rowDataDataStreamSource = env.fromElements(
new RowData[]{
new RowData(99, "stephen", "stephen@example.com", "N.35", "I"),
new RowData(99, "stephen", "stephen@example.com", "N.36","D"),
new RowData(99, "stephen", "stephen1@example.com", "N.37", "U"),
new RowData(98, "kobe","kobe@example.com", "N.36","I")
}
).setParallelism(1);
DataStreamSink<RowData> rowDataDataStreamSink = rowDataDataStreamSource.addSink(
StarRocksSink.sink(
// the table structure
TableSchema.builder()
.field("user_id", DataTypes.INT().notNull())
.field("name", DataTypes.VARCHAR(50))
.field("email", DataTypes.VARCHAR(50))
.field("address", DataTypes.VARCHAR(50))
.primaryKey("user_id")
.build(),
// the sink options
StarRocksSinkOptions.builder()
.withProperty("jdbc-url", "jdbc:mysql://127.0.0.1:9030")
.withProperty("load-url", "127.0.0.1:8030")
.withProperty("username", "root")
.withProperty("password", "")
.withProperty("table-name", "users")
.withProperty("database-name", "demo")
.withProperty("sink.properties.column_separator", "\\x01")
.withProperty("sink.properties.row_delimiter", "\\x02")
.withProperty("sink.parallelism", "1")
.build(),
// set the slots with streamRowData
(slots, streamRowData) -> {
slots[0] = streamRowData.getUserId();
slots[1] = streamRowData.getName();
slots[2] = streamRowData.getEmail();
slots[3] = streamRowData.getAddress();
// slots[slots.length - 1] = StarRocksSinkOP.UPSERT.ordinal(); // The primary key model write operation requires specifying whether it's an upsert or delete. Here, we use upsert as an example.
// If the raw data contains a field indicating whether it's an upsert or delete, you can make the judgment here. For example, if there is a field named "type" in the raw data with values such as "insert," "update," or "delete,"
// then you can add a conditional check here.
if (streamRowData.getType() == "D") {
slots[slots.length - 1] = StarRocksSinkOP.DELETE.ordinal();
} else {
slots[slots.length - 1] = StarRocksSinkOP.UPSERT.ordinal();
}
}
)
);
env.execute();
}
}
Create a Table in StarRocks
CREATE TABLE `users` (
`user_id` bigint(20) NOT NULL COMMENT "",
`name` varchar(65533) NOT NULL COMMENT "",
`email` varchar(65533) NULL COMMENT "",
`address` varchar(65533) NULL COMMENT ""
) ENGINE=OLAP
PRIMARY KEY(`user_id`)
COMMENT "OLAP"
DISTRIBUTED BY HASH(`user_id`) BUCKETS 1;
4. Combine CDC to achieve real-time synchronization of multiple data sources to StarRocks
Currently, it already supports real-time synchronization of data from MySQL, PostgreSQL, Oracle, Hive , SQL Server, and TiDB to StarRocks.
For usage, refer toReal-time Synchronization from MySQL @ Flink_cdc_load
For real-time synchronization with other databases, you can download the corresponding CDC JAR package ( https://github.com/ververica/flink-cdc-connectors/releases) and use it in conjunction with flink-connector-starrocks.
Six FAQ
Q:An error occurred while importing JSON data. The error message is "The size of this batch exceed the max size [104857600] of json type data data [ 118170895 ]. Set ignore_json_size to skip the check, although it may lead huge memory consuming.`"
A: This exceeds the 100MB limit for single import of json, which can be resolved by configuring 'sink.properties.ignore_json_size'='true'. However, this will put pressure on memory, so it is recommended to observe the memory load of BE nodes after adjustment.
Q: flink-cdc + flink-connector-starrocks does not implement update
A: Confirm whether the versions of Flink and Flink-CDC are consistent, refer to CDC Connectors for Apache Flink
Comments
0 comments
Please sign in to leave a comment.