ARTICLE AD BOX
I am trying to run a very simple Flink (Java) job that:
Creates an Iceberg JDBC catalog backed by PostgreSQL Sets the Iceberg warehouse to the Hadoop FileSystemThe job is built successfully with Maven, but when I submit it to a local Flink (1.20.2) it fails during the CREATE CATALOG
My current files pom.xml
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.example</groupId> <artifactId>iceberg-s3-setup</artifactId> <version>1.0.0</version> <packaging>jar</packaging> <name>Iceberg S3 Setup Job</name> <properties> <maven.compiler.source>11</maven.compiler.source> <maven.compiler.target>11</maven.compiler.target> <flink.version>1.20.2</flink.version> <iceberg.version>1.10.0</iceberg.version> <hadoop.version>3.3.6</hadoop.version> </properties> <dependencies> <!-- Flink core --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-java</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java</artifactId> <version>${flink.version}</version> </dependency> <!-- Flink Table API bridge (required for StreamTableEnvironment) --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-api-java-bridge</artifactId> <version>${flink.version}</version> </dependency> <!-- Iceberg Flink runtime (matches Flink 1.18) --> <dependency> <groupId>org.apache.iceberg</groupId> <artifactId>iceberg-flink-runtime-1.20</artifactId> <version>${iceberg.version}</version> </dependency> <!-- Iceberg AWS bundle (contains S3FileIO) --> <dependency> <groupId>org.apache.iceberg</groupId> <artifactId>iceberg-aws</artifactId> <version>${iceberg.version}</version> </dependency> <!-- Hadoop AWS (for s3a://) --> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-aws</artifactId> <version>${hadoop.version}</version> </dependency> </dependencies> <build> <plugins> <!-- Shade plugin to create an uber-jar (optional but recommended) --> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <version>3.5.0</version> <executions> <execution> <phase>package</phase> <goals><goal>shade</goal></goals> <configuration> <createDependencyReducedPom>false</createDependencyReducedPom> <filters> <filter> <artifact>*:*</artifact> <excludes> <exclude>META-INF/*.SF</exclude> <exclude>META-INF/*.DSA</exclude> <exclude>META-INF/*.RSA</exclude> </excludes> </filter> </filters> <!-- Optional: make the jar runnable directly --> <transformers> <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> <mainClass>com.example.IcebergS3SetupJava</mainClass> </transformer> </transformers> </configuration> </execution> </executions> </plugin> </plugins> </build> </project>I produce mainClass = com.example.IcebergS3SetupJava (src/main/java/com/example/IcebergS3SetupJava.java)
package com.example; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; public class IcebergS3SetupJava { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.enableCheckpointing(10_000L); // 10 seconds EnvironmentSettings settings = EnvironmentSettings.newInstance() .inStreamingMode() .build(); StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings); // Create Iceberg JDBC catalog pointing to MinIO/S3 tEnv.executeSql( "CREATE CATALOG iceberg_catalog WITH (\n" + " 'type' = 'iceberg',\n" + " 'catalog-type' = 'jdbc',\n" + " 'catalog-impl' = 'org.apache.iceberg.jdbc.JdbcCatalog', \n + " 'jdbc.uri' = 'jdbc:postgresql://<postgres_ip>:<postgres_port>/iceberg_catalog',\n" + " 'jdbc.user' = 'user',\n" + " 'jdbc.password' = 'pass',\n" + " 'warehouse' = 'hdfs://<my_ip>:8020/user/warehouse',\n" + ")" ); // Create sample table tEnv.executeSql( "CREATE TABLE iceberg_catalog.default.sample (\n" + " id BIGINT COMMENT 'unique id',\n" + " data STRING\n" + ") WITH (\n" + " 'format-version' = '2'\n" + ")" ); System.out.println("Iceberg catalog and sample table created successfully on S3!"); // This job only does DDL → execute to make it run env.execute("Iceberg S3 Catalog & Table Creation"); } }Environmen:
Flink 1.20.2 Iceberg 1.10.0 PostgreSQL 16 Java 11
Submit job
flink run -c com.example.IcebergS3SetupJava target/iceberg-s3-setup-1.0.0.jar
I submit and got error
java.lang.NoClassDefFoundError: Could not initialize class org.apache.flink.runtime.util.HadoopUtils