Catalogs, Schemas, and Tables

This section describes how to create and manage catalogs, schemas, and tables in DataFusion. For those wanting to dive into the code quickly please see the example.

General Concepts

CatalogProviderList, Catalogs, schemas, and tables are organized in a hierarchy. A CatalogProviderList contains catalog providers, a catalog provider contains schemas and a schema contains tables.

DataFusion comes with a basic in memory catalog functionality in the catalog module. You can use these in memory implementations as is, or extend DataFusion with your own catalog implementations, for example based on local files or files on remote object storage.

Similarly to other concepts in DataFusion, you’ll implement various traits to create your own catalogs, schemas, and tables. The following sections describe the traits you’ll need to implement.

The CatalogProviderList trait has methods to register new catalogs, get a catalog by name and list all catalogs .The CatalogProvider trait has methods to set a schema to a name, get a schema by name, and list all schemas. The SchemaProvider, which can be registered with a CatalogProvider, has methods to set a table to a name, get a table by name, list all tables, deregister a table, and check for a table’s existence. The TableProvider trait has methods to scan underlying data and use it in DataFusion. The TableProvider trait is covered in more detail here.

In the following example, we’ll implement an in memory catalog, starting with the SchemaProvider trait as we need one to register with the CatalogProvider. Finally we will implement CatalogProviderList to register the CatalogProvider.

Implementing MemorySchemaProvider

The MemorySchemaProvider is a simple implementation of the SchemaProvider trait. It stores state (i.e. tables) in a DashMap, which then underlies the SchemaProvider trait.

pub struct MemorySchemaProvider {
    tables: DashMap<String, Arc<dyn TableProvider>>,
}

tables is the key-value pair described above. The underlying state could also be another data structure or other storage mechanism such as a file or transactional database.

Then we implement the SchemaProvider trait for MemorySchemaProvider.

#[async_trait]
impl SchemaProvider for MemorySchemaProvider {
    fn as_any(&self) -> &dyn Any {
        self
    }

    fn table_names(&self) -> Vec<String> {
        self.tables
            .iter()
            .map(|table| table.key().clone())
            .collect()
    }

    async fn table(&self, name: &str) -> Option<Arc<dyn TableProvider>> {
        self.tables.get(name).map(|table| table.value().clone())
    }

    fn register_table(
        &self,
        name: String,
        table: Arc<dyn TableProvider>,
    ) -> Result<Option<Arc<dyn TableProvider>>> {
        if self.table_exist(name.as_str()) {
            return Err(DataFusionError::Execution(format!(
                "The table {name} already exists"
            )));
        }
        Ok(self.tables.insert(name, table))
    }

    fn deregister_table(&self, name: &str) -> Result<Option<Arc<dyn TableProvider>>> {
        Ok(self.tables.remove(name).map(|(_, table)| table))
    }

    fn table_exist(&self, name: &str) -> bool {
        self.tables.contains_key(name)
    }
}

Without getting into a CatalogProvider implementation, we can create a MemorySchemaProvider and register TableProviders with it.

let schema_provider = Arc::new(MemorySchemaProvider::new());
let table_provider = _; // create a table provider

schema_provider.register_table("table_name".to_string(), table_provider);

let table = schema_provider.table("table_name").unwrap();

Asynchronous SchemaProvider

It’s often useful to fetch metadata about which tables are in a schema, from a remote source. For example, a schema provider could fetch metadata from a remote database. To support this, the SchemaProvider trait has an asynchronous table method.

The trait is roughly the same except for the table method, and the addition of the #[async_trait] attribute.

#[async_trait]
impl SchemaProvider for Schema {
    async fn table(&self, name: &str) -> Option<Arc<dyn TableProvider>> {
        // fetch metadata from remote source
    }
}

Implementing MemoryCatalogProvider

As mentioned, the CatalogProvider can manage the schemas in a catalog, and the MemoryCatalogProvider is a simple implementation of the CatalogProvider trait. It stores schemas in a DashMap.

pub struct MemoryCatalogProvider {
    schemas: DashMap<String, Arc<dyn SchemaProvider>>,
}

With that the CatalogProvider trait can be implemented.

impl CatalogProvider for MemoryCatalogProvider {
    fn as_any(&self) -> &dyn Any {
        self
    }

    fn schema_names(&self) -> Vec<String> {
        self.schemas.iter().map(|s| s.key().clone()).collect()
    }

    fn schema(&self, name: &str) -> Option<Arc<dyn SchemaProvider>> {
        self.schemas.get(name).map(|s| s.value().clone())
    }

    fn register_schema(
        &self,
        name: &str,
        schema: Arc<dyn SchemaProvider>,
    ) -> Result<Option<Arc<dyn SchemaProvider>>> {
        Ok(self.schemas.insert(name.into(), schema))
    }

    fn deregister_schema(
        &self,
        name: &str,
        cascade: bool,
    ) -> Result<Option<Arc<dyn SchemaProvider>>> {
        /// `cascade` is not used here, but can be used to control whether
        /// to delete all tables in the schema or not.
        if let Some(schema) = self.schema(name) {
            let (_, removed) = self.schemas.remove(name).unwrap();
            Ok(Some(removed))
        } else {
            Ok(None)
        }
    }
}

Again, this is fairly straightforward, as there’s an underlying data structure to store the state, via key-value pairs.

Implementing MemoryCatalogProviderList

pub struct MemoryCatalogProviderList {
    /// Collection of catalogs containing schemas and ultimately TableProviders
    pub catalogs: DashMap<String, Arc<dyn CatalogProvider>>,
}

With that the CatalogProviderList trait can be implemented.

impl CatalogProviderList for MemoryCatalogProviderList {
    fn as_any(&self) -> &dyn Any {
        self
    }

    fn register_catalog(
        &self,
        name: String,
        catalog: Arc<dyn CatalogProvider>,
    ) -> Option<Arc<dyn CatalogProvider>> {
        self.catalogs.insert(name, catalog)
    }

    fn catalog_names(&self) -> Vec<String> {
        self.catalogs.iter().map(|c| c.key().clone()).collect()
    }

    fn catalog(&self, name: &str) -> Option<Arc<dyn CatalogProvider>> {
        self.catalogs.get(name).map(|c| c.value().clone())
    }
}

Like other traits, it also maintains the mapping of the Catalog’s name to the CatalogProvider.

Recap

To recap, you need to:

  1. Implement the TableProvider trait to create a table provider, or use an existing one.

  2. Implement the SchemaProvider trait to create a schema provider, or use an existing one.

  3. Implement the CatalogProvider trait to create a catalog provider, or use an existing one.

  4. Implement the CatalogProviderList trait to create a CatalogProviderList, or use an existing one.