Ancon ParkyDB

ParkyDB - block based, linkable and verifiable document database -- javascript reference implementation


Ancon ParkyDB

A data mesh database using Web 3.0 technology

npm i parkydb

Note: Requires Node v18 and up for development

More about data mesh architecture

Block based data layered abstraction

KV Layer - Layer 0

Stores key as cid and value as block:

{
   "key": "cid",
   "dag": ...blob...,
   "db": ...blob...,
   "index:": ...blob...,
   "gqls": ...blob...,
   "jsonschema": ...blob...
}

Linkable and Verifiable Layer - Layer 1

Stores as an IPLD Multiformats DAG Block. Input must be a JSON payload. Can support either CBOR or JSON. This layer keeps data immutable (no mutations allowed) and uses special directives with query layer.

Document Layer - Layer 2

Stores as a JSON. Input must be a JSON payload. Used for queries only and represents a snapshot of the immutable data in DAG.

Query and index Layer - Layer 3

Stores as a Minisearch filter index. Input must be JSON payload. Used for search only and represents the JSON payload index, the @filter GraphQL directive will enable filtering. Add GraphQL support using Typegraphql to query the document layer.

Run tests

We using Ava test framework

npm test

Note: Enable OpenSSL legacy support.

export NODE_OPTIONS="--openssl-legacy-provider"

API v1.1.0

Store

import { ParkyDB } from 'parkydb'

// Instantiate new DB instance
const db = new ParkyDB('northwind')
await db.initialize()

// Writes a DAG JSON block
const id = await db.putBlock(payload)

// Gets block from KV layer
const res = await db.get(id)

// Queries using document layer
const obs$ = await db.queryBlocks((blocks) => {
    return () => blocks.where({ cid: '' })
});

// Queries with GraphQL
const q = await db.query({
    query: `
    query{
       block(cid: "${id}") {
         network
         key
       }
    }   
    `,
  })

// Query direct to the document layer
db.getBlocksByTableName$('blockdb', (b) => {
      return () =>
        b.where({ 'document.kind': 'StorageAsset' }).limit(limit).toArray()
    })

GraphQL

Create type definitions

// StorageKind.ts
import {} from 'class-validator'
import { Field, InterfaceType, ObjectType } from 'type-graphql'

@ObjectType()
export class Document {
  @Field()
  kind!: string

  @Field()
  tag!: string

  @Field()
  ref!: string
}

// Metadata asset - Store locally
@ObjectType()
export class StorageAsset extends Document{
  @Field()
  name!: string

  @Field()
  kind!: string

  @Field()
  timestamp!: number

  @Field()
  description!: string

  @Field()
  image!: string

  @Field(type => [String])
  sources!: string[]

  @Field()
  owner!: string
}

@ObjectType()
export class StorageBlock extends Document{
  @Field(type => StorageAsset)
  content!: StorageAsset
  @Field()
  signature!: string // Either Waku+Web3 EIP712 or eth_signMessage
  @Field()
  digest!: string
  @Field()
  timestamp!: number
  @Field()
  issuer!: string
}

@ObjectType()
export class IPFSBlock extends Document{
  cid!: string
}

@ObjectType()
export class ConfigBlock extends Document{
  @Field()
  entries!: string
}

@ObjectType()
export class AddressBlock extends Document{
  @Field()
  address!: string

  @Field()
  resolver!: string

  @Field()
  type!:
    | 'erc20'
    | 'erc721'
    | 'smart contract'
    | 'eoa'
    | 'uri'
    | 'phone'
    | 'email'
    | 'gps'
    | 'did'
    | 'ens'
}

@ObjectType()
export class AnconBlock extends Document{
  @Field()
  cid!: string
  @Field()
  topic!: string
}

@ObjectType()
export class ERC721Block extends Document{
  @Field()
  txid!: string
  @Field()
  metadata!: string
  @Field()
  tokenAddress!: string
  @Field()
  tokenId!: string
  @Field()
  chainId!: string
  @Field()
  minterAddress!: string
  @Field()
  ownerAddress!: string
}

Create resolver

// StorageAssetResolver.ts
import 'reflect-metadata'

import {
  Arg,
  Args,
  ArgsType,
  Authorized,
  Ctx,
  Field,
  Int,
  Mutation,
  Query,
  Resolver,
} from 'type-graphql'
import { ParkyDB } from '../core/db'
import { ServiceContext } from '../interfaces/ServiceContext'
import { StorageAsset } from '../interfaces/StorageKind'

@ArgsType()
class StorageAssetArgs {
  @Field((type) => Int, { defaultValue: 10, nullable: true })
  limit!: number
}

@Resolver()
export class StorageAssetResolver {

  @Query((returns) => StorageAsset)
  async asset(@Arg('id') id: string, @Ctx() ctx: ServiceContext) {
    const model = await ctx.db.get(id)
    if (model === undefined) {
      throw new Error('Not found ' + id)
    }
    return model
  }

  @Query((returns) => [StorageAsset])
  async assets(
    @Args() { limit }: StorageAssetArgs,
    @Ctx() ctx: ServiceContext,
  ) {
    return ctx.db.getBlocksByTableName$('blockdb', (b) => {
      return () =>
        b.where({ 'document.kind': 'StorageAsset' }).limit(limit).toArray()
    })
  }
}

Create a resolvers index file

// index.ts
import { BlockValueResolver } from './BlockValueResolver'
import { StorageAssetResolver } from './StorageAssetResolver'

export const defaultResolvers = [
  BlockValueResolver,
  StorageAssetResolver,
] as const


Load into ParkyDB

await this.db.initialize({
  graphql: { resolvers: defaultResolvers }, 
  enableSync: true,
  wakuconnect: {
    bootstrap: { peers: [peer] },
  },
  withWallet: {
    password: '', /// Not used
  },
  withWeb3: {
    provider: web3provider,
    defaultAddress: identity.address,
  },       
  withIpfs: {
    gateway: 'https://ipfs.infura.io',
    api: 'https://ipfs.infura.io:5001',
  },
})

Query

  const q = await db.query({
    query: `
    query{
      block(cid: "${id}"){
        cid,
        document{
          kind
        }
      }
    }   
    `,
  })

  t.is(q.data.block.cid, 'baguqeerabve7ug2qddskk3mpomdt3xdnhvh53jvmca7qh43p36y5hfoassoq')

Topics and Subscriptions

import { ParkyDB } from 'parkydb'

// Instantiate new DB instance
const db = new ParkyDB('data-union')
// Browsers can only support web sockets connections with Waku v2
const peer =
    '/ip4/0.0.0.0/tcp/8000/wss/p2p/...'

  // typegraphql resolvers
  await this.db.initialize({
          graphql: { resolvers: defaultResolvers }, 
          enableSync: true,
          wakuconnect: {
            bootstrap: { peers: [peer] },
          },
          withWallet: {
            password: '', /// Not used
          },
          withWeb3: {
            provider: web3provider,
            defaultAddress: identity.address,
          },       
          withIpfs: {
            gateway: 'https://ipfs.infura.io',
            api: 'https://ipfs.infura.io:5001',
          },
        })
const topic = `/anconprotocol/1/marketplace/ipld-dag-json`

// Writes a DAG JSON block
const id = await db.putBlock({...payload, topic})

// Fetch an existing DAG block
const res = await db.get(id)

// ============================================
// Create a key exchange and encrypted topic
// ============================================
// Keys
const ecdsa = EthCrypto.createIdentity()

// Subscriber codec
const receiverCodec = {
  name: 'cbor',
  code: '0x71',
  encode: async (obj) => encode(obj),
  decode: async (buffer) => {
    const cipher = await EthCrypto.cipher.parse(decode(buffer))

    const plain = await EthCrypto.decryptWithPrivateKey(
      ecdsa.privateKey,
      cipher
    )

    return JSON.parse(plain)
  },
}

// Main topic
const pubsub = await this.db.createTopicPubsub(this.defaultTopic, {
  blockCodec: receiverCodec,
  canSubscribe: true,
  isKeyExchangeChannel: false,
  canPublish: true,
  isCRDT: false,
})
      
pubsub.onBlockReply$.subscribe(async (v) => {
  // custom logic
  await this.db.putBlock(v.decoded.payload)
})

// default block codec
const blockCodec = {
  name: 'cbor',
  code: '0x71',
  encode: async (obj) => encode(obj),
  decode: (buffer) => decode(buffer),
}


// emits key exchange public key
await this.db.emitKeyExchangePublicKey(
  this.keyExchangeTopic,
  {
    blockCodec,
    // disables user signing requests when isCRDT is set to false
    isCRDT: false,
    pk: w.privateKey,
    pub: w.publicKey,
  }
))

// requests key exchange public key
const kex = await this.db.requestKeyExchangePublicKey(
  `/parkydb/1/keyex/cbor`,
  {
    blockCodec,
  }
)
// obtains encryption public key
const sub = kex.subscribe(async (encryptionPubKey: any) => {
  const encBlockCodec = {
    name: 'cbor',
    code: '0x71',
    encode: async (obj) => {
      const enc = await EthCrypto.encryptWithPublicKey(
        encryptionPubKey,
        JSON.stringify(obj)
      )

      const x = await EthCrypto.cipher.stringify(enc)
      return encode(x)
    },
  }

  // custom logic, create another topic or convert observable to promise
})

Wallet

import { ParkyDB } from 'parkydb'

// Instantiate new DB instance
const db = new ParkyDB()
await db.initialize({
  // withWeb3 for interactive usage, eg browsers, smart phones.
  withWeb3: {
    provider: ethers.providers.Web3Provider(windows.ethereum),
    pubkey,
    pubkeySig,
    defaultAddress,
  },
  // withWallet useful for backend use cases (eg NestJS)
  // Remember these values come from a environment variables, CLI or UI, DO NOT hardcode when implementing
  withWallet: {
    password: '',
    // Note: Invented this mnemonic rap, 12 words, as my way to protest #WARINUKRAINE
    seed: 'lumber brown jack house bomb cluster star method guard against war peace',
  }
})

// Where `db.wallet` is metamask keyring controller. See https://github.com/MetaMask/KeyringController
// ParkyDB has an Ed22519 implementation for DID and HPKE use cases
await db.wallet.addNewKeyring('Ed25519', [
  'c87509a1c067bbde78beb793e6fa76530b6382a4c0241e5e4a9ec0a0f44dc0d3',
])

// Wallet is used internally or by setting options to specific usage. See Protocols for how to encrypt and sign.

WebAuthn

import { WebauthnHardwareClient } from 'parkydb/lib/core/webauthnClient'
import { WebauthnHardwareAuthenticate } from 'parkydb/lib/core/webauthnServer'


const model = await this.db.get(cid, null)

// Fido2 server settings
const fido2server = new WebauthnHardwareAuthenticate()
fido2server.initialize({
  rpId: 'localhost',
  rpName: 'du.',
  rpIcon: '',
  attestation: 'none',
  authenticatorRequireResidentKey: false,
  authenticatorUserVerification: 'required',
})

// Fido2 client settings
const fido2client = new WebauthnHardwareClient(fido2server, this.db)
const origin = window.location.origin
const res = await fido2client.registerSign(
  origin,
  model.cid,
  this.defaultAddress(),
  model.dag.bytes
  (args) => { 
     const {
      // publicKey as Uint8Array    
      publicKey,
      // publicKey as JWK
      publicKeyJwk,
      // previous counter
      prevCounter,
      // authenticator data
      authnrData,
      // client data JSON
      clientData,
    } = args
  },
  keepSigning: false
)

DeFi example

test('find multichain tx by sender', async (t) => {
  const {
    alice,
    bob,
    charlie,
    consumer,
  }: {
    alice: ParkyDB
    bob: ParkyDB
    charlie: ParkyDB
    consumer: ParkyDB
  } = t.context as any

  await alice.wallet.submitPassword(`qwerty`)
  let accounts = await alice.wallet.getAccounts()
  const accountA = accounts[0]

  await bob.wallet.submitPassword(`zxcvb`)
  accounts = await bob.wallet.getAccounts()
  const accountB = accounts[0]

  await charlie.wallet.submitPassword(`a1d2f3f4`)
  accounts = await charlie.wallet.getAccounts()
  const accountC = accounts[0]

  await consumer.wallet.submitPassword(`mknjbhvgv`)
  accounts = await consumer.wallet.getAccounts()
  const accountConsumer = accounts[0]

  const blockCodec = {
    name: 'eth-block',
    code: '0x90',
    encode: (obj: any) => encodeDagEth(obj),
    decode: (buffer: any) => decodeDagEth(buffer),
  }

  const topicBSC = `/bsc/1/new_blocks/dageth`
  const topicEthereum = `/ethereum/1/new_blocks/dageth`
  const topicPolygon = `/polygon/1/new_blocks/dageth`

  // Aggregate from BSC, Ethereum and Polygon any Transfer to x address
  // Then pipe calls to Discord channel and an arbitrage bot using a webhook (POST)
  const pubsubAlice = await alice.createTopicPubsub(topicBSC, {
    from: accountA,
    middleware: {
      incoming: [tap()],
      outgoing: [tap()],
    },
    blockCodec,
    canSubscribe: true,
    canPublish: true,
    isCRDT: false,
  })
  const pubsubBob = await bob.createTopicPubsub(topicEthereum, {
    from: accountB,
    middleware: {
      incoming: [tap()],
      outgoing: [tap()],
    },
    blockCodec,
    canSubscribe: true,
    canPublish: true,
    isCRDT: false,
  })
  const pubsubCharlie = await charlie.createTopicPubsub(topicPolygon, {
    from: accountC,
    middleware: {
      incoming: [tap()],
      outgoing: [tap()],
    },
    blockCodec,
    canSubscribe: true,
    canPublish: true,
    isCRDT: false,
  })

  subscribeNewBlocks(
    [
      {
        name: 'bsc',
        chainId: '56',
        rpc: 'wss://somerpc.server',
      },
    ],
    (payload: any) => {
      await alice.putBlock(payload, { topic })
    },
  )

  subscribeNewBlocks(
    [
      {
        name: 'mainnet',
        chainId: '1',
        rpc: 'wss://somerpc.server',
      },
    ],
    (payload: any) => {
      await bob.putBlock(payload, { topic })
    },
  )
  subscribeNewBlocks(
    [
      {
        name: 'polygon',
        chainId: '137',
        rpc: 'wss://somerpc.server',
      },
    ],
    (payload: any) => {
      await charlie.putBlock(payload, { topic })
    },
  )
})

Copyright IFESA 2022