CSV
This article introduces you to the usage and type mapping of the CSV format.
Background Information
The CSV format allows reading and writing CSV data based on the CSV structure. Currently, the CSV structure is derived based on the table structure.
Example of Use
An example of constructing a table using Kafka and Avro format is as follows.
CREATE TABLE user_behavior (
user_id BIGINT,
item_id BIGINT,
category_id BIGINT,
behavior STRING,
ts TIMESTAMP(3)
) WITH (
'connector' = 'kafka',
'topic' = 'user_behavior',
'properties.bootstrap.servers' = 'localhost :9092',
'properties.group.id' = 'testGroup',
'format' = 'csv',
'csv.ignore-parse-errors' = 'true',
'csv.allow-comments' = 'true'
)
Configuration Options
Parameter | Description | Required | Default | Type |
---|---|---|---|---|
format | The format to use for the declaration. When using the CSV format, the parameter value is csv. | yes | none | String |
csv.field-delimiter | Specify the field delimiter, only single characters can be used, the default is comma (,). Parameter values can be: The backslash character, used to specify special characters. For example, t represents a tab character. Unicode encoding, specifying special characters in plain SQL text. For example, ‘csv.field-delimiter’ = U&’0001’ represents the 0x01 character. | no | , | String |
csv.disable-quote-character | The parameter values are as follows: true: Disallow quotes around quoted values. When disabled, the option csv.quote-character cannot be set. false (default): Allow quotes around quoted values. | no | false | Boolean |
csv.quote-character | Specifies the quote character to enclose field values, defaults to double quotes (“). | no | “none” | String |
csv.allow-comments | The parameter values are as follows: true: Comment lines are allowed to be ignored, and comment lines start with #. If commented lines are allowed, make sure csv.ignore-parse-errors is also turned on to allow empty lines. false (default): disable ignoring of commented lines. | no | false | Boolean |
csv.ignore-parse-errors | The parameter values are as follows: true: When parsing an exception, skip the current field and set the field value to null. false (default): When parsing an exception, an error is thrown and the job fails to start. | no | false | Boolean |
csv.array-element-delimiter | Specifies the string separating array and row elements, defaults to a semicolon (;). | no | ; | String |
csv.escape-character | Specifies the escape character, disabled by default. | no | none | String |
csv.null-literal | Specifies the string to convert null values to, disabled by default. | no | none | String |
csv.write-bigdecimal-in-scientific-notation | The parameter values are as follows: true (default): Data of type Bigdecimal is represented as scientific notation. Example: 100000 is expressed as 1E+5. false: Data of Bigdecimal type remains as it is. Example: 100000 is still expressed as 100000. | no | true | Boolean |
Type Mapping
Currently the structure of CSV is derived from the table structure. In Flink, CSV format data uses jackson databind API to parse CSV strings. The mapping relationship between Flink and CSV data types is as follows.
Flink SQL type | CSV type |
---|---|
CHAR/VARCHAR/STRING | string |
BOOLEAN | boolean |
BINARY / VARBINARY | string with encoding: base64 |
DECIMAL | number |
TINYINT | number |
SMALLINT | number |
INT | number |
BIGINT | number |
FLOAT | number |
DOUBLE | number |
DATE | string with format: date |
time | string with format: time |
TIMESTAMP | string with format: date-time |
INTERVAL | number |
ARRAY | array |
ROW | object |
Type Mapping
For writing to object storage S3, currently it does not support writing files in CSV format. For specific reasons, see FLINK-30635.
This page is derived from the official Apache Flink® documentation.
Refer to the Credits page for more information.