Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
410 changes: 77 additions & 333 deletions Cargo.lock

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ async-graphql-axum = "7.0.6"
axum = "0.7.5"
chrono = "0.4.38"
clap = { version = "4.5.4", features = ["derive", "env"] }
derivative = "2.2.0"
diesel = { version = "2.1.3", features = ["postgres", "serde_json", "numeric", "r2d2", "chrono", "uuid"] }
diesel-derive-enum = { version = "2.1.0", features = ["postgres"] }
diesel-dynamic-schema = "0.2.1"
Expand Down
4 changes: 0 additions & 4 deletions core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,4 @@ anyhow = "1.0"

[dev-dependencies]
tower-test = { git = "https://github.com/tower-rs/tower.git" }
ipfs-api-backend-hyper = "0.6"
ipfs-api = { version = "0.17.0", features = [
"with-hyper-rustls",
], default-features = false }
uuid = { version = "1.9.1", features = ["v4"] }
105 changes: 45 additions & 60 deletions core/src/polling_monitor/ipfs_service.rs
Original file line number Diff line number Diff line change
@@ -1,29 +1,27 @@
use std::sync::Arc;
use std::time::Duration;

use anyhow::{anyhow, Error};
use bytes::Bytes;
use graph::futures03::future::BoxFuture;
use graph::{
derive::CheapClone,
ipfs_client::{CidFile, IpfsClient},
prelude::CheapClone,
};
use std::time::Duration;
use graph::ipfs::ContentPath;
use graph::ipfs::IpfsClient;
use graph::ipfs::IpfsError;
use graph::{derive::CheapClone, prelude::CheapClone};
use tower::{buffer::Buffer, ServiceBuilder, ServiceExt};

const CLOUDFLARE_TIMEOUT: u16 = 524;
const GATEWAY_TIMEOUT: u16 = 504;

pub type IpfsService = Buffer<CidFile, BoxFuture<'static, Result<Option<Bytes>, Error>>>;
pub type IpfsService = Buffer<ContentPath, BoxFuture<'static, Result<Option<Bytes>, Error>>>;

pub fn ipfs_service(
client: IpfsClient,
client: Arc<dyn IpfsClient>,
max_file_size: usize,
timeout: Duration,
rate_limit: u16,
) -> IpfsService {
let ipfs = IpfsServiceInner {
client,
max_file_size,
timeout,
max_file_size,
};

let svc = ServiceBuilder::new()
Expand All @@ -38,37 +36,30 @@ pub fn ipfs_service(

#[derive(Clone, CheapClone)]
struct IpfsServiceInner {
client: IpfsClient,
max_file_size: usize,
client: Arc<dyn IpfsClient>,
timeout: Duration,
max_file_size: usize,
}

impl IpfsServiceInner {
async fn call_inner(self, req: CidFile) -> Result<Option<Bytes>, Error> {
let CidFile { cid, path } = req;
let multihash = cid.hash().code();
async fn call_inner(self, path: ContentPath) -> Result<Option<Bytes>, Error> {
let multihash = path.cid().hash().code();
if !SAFE_MULTIHASHES.contains(&multihash) {
return Err(anyhow!("CID multihash {} is not allowed", multihash));
}

let cid_str = match path {
Some(path) => format!("{}/{}", cid, path),
None => cid.to_string(),
};

let res = self
.client
.cat_all(&cid_str, Some(self.timeout), self.max_file_size)
.cat(&path, self.max_file_size, Some(self.timeout))
.await;

match res {
Ok(file_bytes) => Ok(Some(file_bytes)),
Err(e) => match e.status().map(|e| e.as_u16()) {
// Timeouts in IPFS mean the file is not available, so we return `None`
Some(GATEWAY_TIMEOUT) | Some(CLOUDFLARE_TIMEOUT) => return Ok(None),
_ if e.is_timeout() => return Ok(None),
_ => return Err(e.into()),
},
Err(IpfsError::RequestFailed(err)) if err.is_timeout() => {
// Timeouts in IPFS mean that the content is not available, so we return `None`.
Ok(None)
}
Err(err) => Err(err.into()),
}
}
}
Expand Down Expand Up @@ -96,48 +87,42 @@ const SAFE_MULTIHASHES: [u64; 15] = [

#[cfg(test)]
mod test {
use ipfs::IpfsApi;
use ipfs_api as ipfs;
use std::{fs, str::FromStr, time::Duration};
use std::time::Duration;

use graph::components::link_resolver::ArweaveClient;
use graph::components::link_resolver::ArweaveResolver;
use graph::data::value::Word;
use graph::ipfs::test_utils::add_files_to_local_ipfs_node_for_testing;
use graph::ipfs::IpfsRpcClient;
use graph::ipfs::ServerAddress;
use graph::tokio;
use tower::ServiceExt;

use cid::Cid;
use graph::{
components::link_resolver::{ArweaveClient, ArweaveResolver},
data::value::Word,
ipfs_client::IpfsClient,
tokio,
};

use uuid::Uuid;

use super::*;

#[tokio::test]
async fn cat_file_in_folder() {
let path = "./tests/fixtures/ipfs_folder";
let uid = Uuid::new_v4().to_string();
fs::write(format!("{}/random.txt", path), &uid).unwrap();
let random_bytes = Uuid::new_v4().as_bytes().to_vec();
let ipfs_file = ("dir/file.txt", random_bytes.clone());

let cl: ipfs::IpfsClient = ipfs::IpfsClient::default();
let add_resp = add_files_to_local_ipfs_node_for_testing([ipfs_file])
.await
.unwrap();

let rsp = cl.add_path(path).await.unwrap();
let dir_cid = add_resp.into_iter().find(|x| x.name == "dir").unwrap().hash;

let ipfs_folder = rsp.iter().find(|rsp| rsp.name == "ipfs_folder").unwrap();
let client =
IpfsRpcClient::new_unchecked(ServerAddress::local_rpc_api(), &graph::log::discard())
.unwrap()
.into_boxed();

let local = IpfsClient::localhost();
let cid = Cid::from_str(&ipfs_folder.hash).unwrap();
let file = "random.txt".to_string();
let svc = ipfs_service(client.into(), 100000, Duration::from_secs(30), 10);

let svc = super::ipfs_service(local, 100000, Duration::from_secs(5), 10);
let path = ContentPath::new(format!("{dir_cid}/file.txt")).unwrap();
let content = svc.oneshot(path).await.unwrap().unwrap();

let content = svc
.oneshot(super::CidFile {
cid,
path: Some(file),
})
.await
.unwrap()
.unwrap();
assert_eq!(content.to_vec(), uid.as_bytes().to_vec());
assert_eq!(content.to_vec(), random_bytes);
}

#[tokio::test]
Expand Down
6 changes: 3 additions & 3 deletions core/src/subgraph/context/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use graph::{
CausalityRegion, DataSource, DataSourceTemplate,
},
derive::CheapClone,
ipfs_client::CidFile,
ipfs::ContentPath,
prelude::{
BlockNumber, BlockPtr, BlockState, CancelGuard, CheapClone, DeploymentHash,
MetricsRegistry, RuntimeHostBuilder, SubgraphCountMetric, SubgraphInstanceMetrics,
Expand Down Expand Up @@ -228,8 +228,8 @@ impl<C: Blockchain, T: RuntimeHostBuilder<C>> IndexingContext<C, T> {
}

pub struct OffchainMonitor {
ipfs_monitor: PollingMonitor<CidFile>,
ipfs_monitor_rx: mpsc::UnboundedReceiver<(CidFile, Bytes)>,
ipfs_monitor: PollingMonitor<ContentPath>,
ipfs_monitor_rx: mpsc::UnboundedReceiver<(ContentPath, Bytes)>,
arweave_monitor: PollingMonitor<Base64>,
arweave_monitor_rx: mpsc::UnboundedReceiver<(Base64, Bytes)>,
}
Expand Down
4 changes: 3 additions & 1 deletion graph/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ atomic_refcell = "0.1.13"
old_bigdecimal = { version = "=0.1.2", features = ["serde"], package = "bigdecimal" }
bytes = "1.0.1"
cid = "0.11.1"
derivative = { workspace = true }
graph_derive = { path = "./derive" }
diesel = { workspace = true }
diesel_derives = { workspace = true }
Expand Down Expand Up @@ -90,7 +91,7 @@ defer = "0.2"
# Our fork contains patches to make some fields optional for Celo and Fantom compatibility.
# Without the "arbitrary_precision" feature, we get the error `data did not match any variant of untagged enum Response`.
web3 = { git = "https://github.com/graphprotocol/rust-web3", branch = "graph-patches-onto-0.18", features = [
"arbitrary_precision","test"
"arbitrary_precision", "test"
] }
serde_plain = "1.0.2"
csv = "1.3.0"
Expand All @@ -100,6 +101,7 @@ object_store = { version = "0.10.1", features = ["gcp"] }
clap.workspace = true
maplit = "1.0.2"
hex-literal = "0.4"
wiremock = "0.6.1"

[build-dependencies]
tonic-build = { workspace = true }
Loading