Skip to main content

CSV

This article introduces you to the usage and type mapping of the CSV format.

Background Information

The CSV format allows reading and writing CSV data based on the CSV structure. Currently, the CSV structure is derived based on the table structure.

Example of Use

An example of constructing a table using Kafka and Avro format is as follows.

    CREATE TABLE user_behavior (
user_id BIGINT,
item_id BIGINT,
category_id BIGINT,
behavior STRING,
ts TIMESTAMP(3)
) WITH (
'connector' = 'kafka',
'topic' = 'user_behavior',
'properties.bootstrap.servers' = 'localhost :9092',
'properties.group.id' = 'testGroup',
'format' = 'csv',
'csv.ignore-parse-errors' = 'true',
'csv.allow-comments' = 'true'
)

Configuration Options

ParameterDescriptionRequiredDefaultType
formatThe format to use for the declaration. When using the CSV format, the parameter value is csv.yesnoneString
csv.field-delimiterSpecify the field delimiter, only single characters can be used, the default is comma (,). Parameter values can be: The backslash character, used to specify special characters. For example, t represents a tab character. Unicode encoding, specifying special characters in plain SQL text. For example, ‘csv.field-delimiter’ = U&’0001’ represents the 0x01 character.no,String
csv.disable-quote-characterThe parameter values are as follows: true: Disallow quotes around quoted values. When disabled, the option csv.quote-character cannot be set. false (default): Allow quotes around quoted values.nofalseBoolean
csv.quote-characterSpecifies the quote character to enclose field values, defaults to double quotes (“).no“none”String
csv.allow-commentsThe parameter values are as follows: true: Comment lines are allowed to be ignored, and comment lines start with #. If commented lines are allowed, make sure csv.ignore-parse-errors is also turned on to allow empty lines. false (default): disable ignoring of commented lines.nofalseBoolean
csv.ignore-parse-errorsThe parameter values are as follows: true: When parsing an exception, skip the current field and set the field value to null. false (default): When parsing an exception, an error is thrown and the job fails to start.nofalseBoolean
csv.array-element-delimiterSpecifies the string separating array and row elements, defaults to a semicolon (;).no;String
csv.escape-characterSpecifies the escape character, disabled by default.nononeString
csv.null-literalSpecifies the string to convert null values to, disabled by default.nononeString
csv.write-bigdecimal-in-scientific-notationThe parameter values are as follows: true (default): Data of type Bigdecimal is represented as scientific notation. Example: 100000 is expressed as 1E+5. false: Data of Bigdecimal type remains as it is. Example: 100000 is still expressed as 100000.notrueBoolean

Type Mapping

Currently the structure of CSV is derived from the table structure. In Flink, CSV format data uses jackson databind API to parse CSV strings. The mapping relationship between Flink and CSV data types is as follows.

Flink SQL typeCSV type
CHAR/VARCHAR/STRINGstring
BOOLEANboolean
BINARY / VARBINARYstring with encoding: base64
DECIMALnumber
TINYINTnumber
SMALLINTnumber
INTnumber
BIGINTnumber
FLOATnumber
DOUBLEnumber
DATEstring with format: date
timestring with format: time
TIMESTAMPstring with format: date-time
INTERVALnumber
ARRAYarray
ROWobject

Type Mapping

For writing to object storage S3, currently it does not support writing files in CSV format. For specific reasons, see FLINK-30635.

note

This page is derived from the official Apache Flink® documentation.

Refer to the Credits page for more information.